未验证 提交 06ced4d1 编写于 作者: C congqixia 提交者: GitHub

Add DropVirtualChannel for DataCoord (#12361)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 c4f0837d
......@@ -319,7 +319,7 @@ func (c *ChannelStore) remove(nodeID int64) error {
func (c *ChannelStore) txn(opSet ChannelOpSet) error {
saves := make(map[string]string)
removals := make([]string, 0)
var removals []string
for _, update := range opSet {
for i, c := range update.Channels {
k := buildChannelKey(update.NodeID, c.Name)
......
......@@ -36,6 +36,7 @@ import (
const (
metaPrefix = "datacoord-meta"
segmentPrefix = metaPrefix + "/s"
channelRemovePrefix = metaPrefix + "/channel-removal"
handoffSegmentPrefix = "querycoord-handoff"
)
......@@ -334,6 +335,153 @@ func (m *meta) UpdateFlushSegmentsInfo(
return nil
}
// UpdateDropChannelSegmentInfo updates segment checkpoints and binlogs before drop
// reusing segment info to pass segment id, binlogs, statslog, deltalog, start position and checkpoint
func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentInfo) error {
m.Lock()
defer m.Unlock()
modSegments := make(map[UniqueID]*SegmentInfo)
for _, seg2Drop := range segments {
segment := m.mergeDropSegment(seg2Drop)
if segment != nil {
modSegments[seg2Drop.GetID()] = segment
}
}
return m.batchSaveDropSegments(channel, modSegments)
}
// mergeDropSegment merges drop segment information with meta segments
func (m *meta) mergeDropSegment(seg2Drop *SegmentInfo) *SegmentInfo {
segment := m.segments.GetSegment(seg2Drop.ID)
// healthy check makes sure the Idempotence
if segment == nil || !isSegmentHealthy(segment) {
log.Warn("UpdateDropChannel skipping nil or unhealthy", zap.Bool("is nil", segment == nil),
zap.Bool("isHealthy", isSegmentHealthy(segment)))
return nil
}
clonedSegment := segment.Clone()
clonedSegment.State = commonpb.SegmentState_Dropped
currBinlogs := clonedSegment.GetBinlogs()
var getFieldBinlogs = func(id UniqueID, binlogs []*datapb.FieldBinlog) *datapb.FieldBinlog {
for _, binlog := range binlogs {
if id == binlog.GetFieldID() {
return binlog
}
}
return nil
}
// binlogs
for _, tBinlogs := range seg2Drop.GetBinlogs() {
fieldBinlogs := getFieldBinlogs(tBinlogs.GetFieldID(), currBinlogs)
if fieldBinlogs == nil {
currBinlogs = append(currBinlogs, tBinlogs)
} else {
fieldBinlogs.Binlogs = append(fieldBinlogs.Binlogs, tBinlogs.Binlogs...)
}
}
clonedSegment.Binlogs = currBinlogs
// statlogs
currStatsLogs := clonedSegment.GetStatslogs()
for _, tStatsLogs := range seg2Drop.GetStatslogs() {
fieldStatsLog := getFieldBinlogs(tStatsLogs.GetFieldID(), currStatsLogs)
if fieldStatsLog == nil {
currStatsLogs = append(currStatsLogs, tStatsLogs)
} else {
fieldStatsLog.Binlogs = append(fieldStatsLog.Binlogs, tStatsLogs.Binlogs...)
}
}
clonedSegment.Statslogs = currStatsLogs
// deltalogs
clonedSegment.Deltalogs = append(clonedSegment.Deltalogs, seg2Drop.GetDeltalogs()...)
// start position
if seg2Drop.GetStartPosition() != nil {
clonedSegment.StartPosition = seg2Drop.GetStartPosition()
}
// checkpoint
if seg2Drop.GetDmlPosition() != nil {
clonedSegment.DmlPosition = seg2Drop.GetDmlPosition()
}
clonedSegment.currRows = seg2Drop.currRows
return clonedSegment
}
// batchSaveDropSegments saves drop segments info with channel removal flag
// since the channel unwatching operation is not atomic here
// ** the removal flag is always with last batch
// ** the last batch must contains at least one segment
// 1. when failure occurs between batches, failover mechanism will continue with the earlist checkpoint of this channel
// since the flag is not marked so data node can re-consume the drop collection msg
// 2. when failure occurs between save meta and unwatch channel, the removal flag shall be check before let datanode watch this channel
func (m *meta) batchSaveDropSegments(channel string, modSegments map[int64]*SegmentInfo) error {
// the limitation of etcd operations number per transaction is 128, since segment number might be enormous so we shall split
// all save operations into batches
// since the removal flag shall always be with the last batch, so the last batch shall be maxOperationNumber - 1
for len(modSegments) > maxOperationsPerTxn-1 {
err := m.saveDropSegmentAndRemove(channel, modSegments, false, func(kv map[string]string, modSegments map[int64]*SegmentInfo) bool {
// batch filled or only one segment left
// since the last batch must contains at least on segment
return len(kv) == maxOperationsPerTxn || len(modSegments) == 1
})
if err != nil {
return err
}
}
// removal flag should be saved with last batch
return m.saveDropSegmentAndRemove(channel, modSegments, true, func(_ map[string]string, _ map[int64]*SegmentInfo) bool { return false })
}
func (m *meta) saveDropSegmentAndRemove(channel string, modSegments map[int64]*SegmentInfo, withFlag bool, stopper func(kv map[string]string, modSegment map[int64]*SegmentInfo) bool) error {
kv := make(map[string]string)
update := make([]*SegmentInfo, 0, maxOperationsPerTxn)
for id, s := range modSegments {
key := buildSegmentPath(s.GetCollectionID(), s.GetPartitionID(), s.GetID())
delete(modSegments, id)
segBytes, err := proto.Marshal(s.SegmentInfo)
if err != nil {
return fmt.Errorf("DataCoord UpdateDropChannelSegmentInfo segmentID:%d, marshal failed:%w", s.GetID(), err)
}
kv[key] = string(segBytes)
update = append(update, s)
if stopper(kv, modSegments) {
break
}
}
if withFlag {
// add removal flag into meta, preventing non-atomic removal channel failure
removalFlag := buildChannelRemovePath(channel)
kv[removalFlag] = ""
}
err := m.saveKvTxn(kv)
if err != nil {
log.Warn("Failed to txn save segment info batch for DropChannel", zap.Error(err))
return err
}
// update memory info
for _, s := range update {
m.segments.SetSegment(s.GetID(), s)
}
return nil
}
// FinishRemoveChannel removes channel remove flag after whole procedure is finished
func (m *meta) FinishRemoveChannel(channel string) error {
key := buildChannelRemovePath(channel)
return m.client.Remove(key)
}
// ListSegmentFiles lists all segments' logs
func (m *meta) ListSegmentFiles() []string {
m.RLock()
......@@ -758,6 +906,11 @@ func buildQuerySegmentPath(collectionID UniqueID, partitionID UniqueID, segmentI
return fmt.Sprintf("%s/%d/%d/%d", handoffSegmentPrefix, collectionID, partitionID, segmentID)
}
// buildChannelRemovePat builds vchannel remove flag path
func buildChannelRemovePath(channel string) string {
return fmt.Sprintf("%s/%s", channelRemovePrefix, channel)
}
// buildSegment utility function for compose datapb.SegmentInfo struct with provided info
func buildSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, channelName string) *SegmentInfo {
info := &datapb.SegmentInfo{
......@@ -772,6 +925,7 @@ func buildSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueI
}
func isSegmentHealthy(segment *SegmentInfo) bool {
return segment.GetState() != commonpb.SegmentState_NotExist &&
return segment.GetState() != commonpb.SegmentState_SegmentStateNone &&
segment.GetState() != commonpb.SegmentState_NotExist &&
segment.GetState() != commonpb.SegmentState_Dropped
}
......@@ -515,7 +515,8 @@ func Test_meta_SetSegmentCompacting(t *testing.T) {
map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
ID: 1,
State: commonpb.SegmentState_Flushed,
},
isCompacting: false,
},
......@@ -563,18 +564,21 @@ func Test_meta_GetSegmentsOfCollection(t *testing.T) {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 1,
State: commonpb.SegmentState_Flushed,
},
},
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 1,
State: commonpb.SegmentState_Growing,
},
},
3: {
SegmentInfo: &datapb.SegmentInfo{
ID: 3,
CollectionID: 2,
State: commonpb.SegmentState_Flushed,
},
},
},
......@@ -588,12 +592,14 @@ func Test_meta_GetSegmentsOfCollection(t *testing.T) {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 1,
State: commonpb.SegmentState_Flushed,
},
},
{
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 1,
State: commonpb.SegmentState_Growing,
},
},
},
......
......@@ -520,12 +520,14 @@ func TestSegmentManager_DropSegmentsOfChannel(t *testing.T) {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
},
},
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
InsertChannel: "ch2",
State: commonpb.SegmentState_Flushed,
},
},
},
......@@ -555,6 +557,7 @@ func TestSegmentManager_DropSegmentsOfChannel(t *testing.T) {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
InsertChannel: "ch2",
State: commonpb.SegmentState_Growing,
},
},
},
......
......@@ -346,6 +346,7 @@ func TestGetInsertBinlogPaths(t *testing.T) {
},
},
},
State: commonpb.SegmentState_Growing,
}
err := svr.meta.AddSegment(NewSegmentInfo(info))
assert.Nil(t, err)
......@@ -372,6 +373,8 @@ func TestGetInsertBinlogPaths(t *testing.T) {
},
},
},
State: commonpb.SegmentState_Growing,
}
err := svr.meta.AddSegment(NewSegmentInfo(info))
assert.Nil(t, err)
......@@ -450,7 +453,8 @@ func TestGetSegmentInfo(t *testing.T) {
defer closeTestServer(t, svr)
segInfo := &datapb.SegmentInfo{
ID: 0,
ID: 0,
State: commonpb.SegmentState_Flushed,
}
err := svr.meta.AddSegment(NewSegmentInfo(segInfo))
assert.Nil(t, err)
......@@ -467,7 +471,8 @@ func TestGetSegmentInfo(t *testing.T) {
defer closeTestServer(t, svr)
segInfo := &datapb.SegmentInfo{
ID: 0,
ID: 0,
State: commonpb.SegmentState_Flushed,
}
err := svr.meta.AddSegment(NewSegmentInfo(segInfo))
assert.Nil(t, err)
......@@ -816,6 +821,7 @@ func TestSaveBinlogPaths(t *testing.T) {
ID: segment.id,
CollectionID: segment.collectionID,
InsertChannel: "ch1",
State: commonpb.SegmentState_Growing,
}
err := svr.meta.AddSegment(NewSegmentInfo(s))
assert.Nil(t, err)
......@@ -897,6 +903,7 @@ func TestSaveBinlogPaths(t *testing.T) {
ID: 1,
CollectionID: 1,
InsertChannel: "ch1",
State: commonpb.SegmentState_Growing,
},
})
assert.Nil(t, err)
......@@ -915,6 +922,143 @@ func TestSaveBinlogPaths(t *testing.T) {
})
}
func TestDropVirtualChannel(t *testing.T) {
t.Run("normal DropVirtualChannel", func(t *testing.T) {
spyCh := make(chan struct{}, 1)
svr := newTestServer(t, nil, SetSegmentManager(&spySegmentManager{spyCh: spyCh}))
defer closeTestServer(t, svr)
svr.meta.AddCollection(&datapb.CollectionInfo{ID: 0})
type testSegment struct {
id UniqueID
collectionID UniqueID
}
segments := make([]testSegment, 0, maxOperationsPerTxn) // test batch overflow
for i := 0; i < maxOperationsPerTxn; i++ {
segments = append(segments, testSegment{
id: int64(i),
collectionID: 0,
})
}
for idx, segment := range segments {
s := &datapb.SegmentInfo{
ID: segment.id,
CollectionID: segment.collectionID,
InsertChannel: "ch1",
State: commonpb.SegmentState_Growing,
}
if idx%2 == 0 {
s.Binlogs = []*datapb.FieldBinlog{
{FieldID: 1},
}
s.Statslogs = []*datapb.FieldBinlog{
{FieldID: 1},
}
}
err := svr.meta.AddSegment(NewSegmentInfo(s))
assert.Nil(t, err)
}
err := svr.channelManager.AddNode(0)
require.Nil(t, err)
err = svr.channelManager.Watch(&channel{"ch1", 0})
require.Nil(t, err)
ctx := context.Background()
req := &datapb.DropVirtualChannelRequest{
Base: &commonpb.MsgBase{
Timestamp: uint64(time.Now().Unix()),
},
ChannelName: "ch1",
Segments: make([]*datapb.DropVirtualChannelSegment, 0, maxOperationsPerTxn),
}
for _, segment := range segments {
seg2Drop := &datapb.DropVirtualChannelSegment{
SegmentID: segment.id,
CollectionID: segment.collectionID,
Field2BinlogPaths: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []string{
"/by-dev/test/0/1/2/1/Allo1",
"/by-dev/test/0/1/2/1/Allo2",
},
},
},
Field2StatslogPaths: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []string{
"/by-dev/test/0/1/2/1/stats1",
"/by-dev/test/0/1/2/1/stats2",
},
},
},
Deltalogs: []*datapb.DeltaLogInfo{
{
RecordEntries: 1,
DeltaLogPath: "/by-dev/test/0/1/2/1/delta1",
},
},
CheckPoint: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 0,
},
StartPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 0,
},
NumOfRows: 10,
}
req.Segments = append(req.Segments, seg2Drop)
}
resp, err := svr.DropVirtualChannel(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
<-spyCh
err = svr.channelManager.Watch(&channel{"ch1", 0})
require.Nil(t, err)
//resend
resp, err = svr.DropVirtualChannel(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("with channel not matched", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
err := svr.channelManager.AddNode(0)
require.Nil(t, err)
err = svr.channelManager.Watch(&channel{"ch1", 0})
require.Nil(t, err)
resp, err := svr.DropVirtualChannel(context.Background(), &datapb.DropVirtualChannelRequest{
ChannelName: "ch2",
})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
})
t.Run("with closed server", func(t *testing.T) {
svr := newTestServer(t, nil)
closeTestServer(t, svr)
resp, err := svr.DropVirtualChannel(context.Background(), &datapb.DropVirtualChannelRequest{})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason())
})
}
func TestDataNodeTtChannel(t *testing.T) {
genMsg := func(msgType commonpb.MsgType, ch string, t Timestamp) *msgstream.DataNodeTtMsg {
return &msgstream.DataNodeTtMsg{
......
......@@ -384,6 +384,70 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
return resp, nil
}
// DropVirtualChannel notifies vchannel dropped
// And contains the remaining data log & checkpoint to update
func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
resp := &datapb.DropVirtualChannelResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}
if s.isClosed() {
resp.Status.Reason = serverNotServingErrMsg
return resp, nil
}
channel := req.GetChannelName()
log.Debug("receive DropVirtualChannel request",
zap.String("channel name", channel))
// validate
nodeID := req.GetBase().GetSourceID()
if !s.channelManager.Match(nodeID, channel) {
FailResponse(resp.Status, fmt.Sprintf("channel %s is not watched on node %d", channel, nodeID))
log.Warn("node is not matched with channel", zap.String("channel", channel), zap.Int64("nodeID", nodeID))
return resp, nil
}
segments := make([]*SegmentInfo, 0, len(req.GetSegments()))
for _, seg2Drop := range req.GetSegments() {
info := &datapb.SegmentInfo{
ID: seg2Drop.GetSegmentID(),
CollectionID: seg2Drop.GetCollectionID(),
InsertChannel: channel,
Binlogs: seg2Drop.GetField2BinlogPaths(),
Statslogs: seg2Drop.GetField2StatslogPaths(),
Deltalogs: seg2Drop.GetDeltalogs(),
StartPosition: seg2Drop.GetStartPosition(),
DmlPosition: seg2Drop.GetCheckPoint(),
NumOfRows: seg2Drop.GetNumOfRows(),
}
segment := NewSegmentInfo(info)
segments = append(segments, segment)
}
err := s.meta.UpdateDropChannelSegmentInfo(channel, segments)
if err != nil {
log.Error("Update Drop Channel segment info failed", zap.String("channel", channel), zap.Error(err))
resp.Status.Reason = err.Error()
return resp, nil
}
log.Debug("DropVChannel plan to remove", zap.String("channel", channel))
err = s.channelManager.RemoveChannel(channel)
if err != nil {
log.Warn("DropVChannel failed to RemoveChannel", zap.String("channel", channel), zap.Error(err))
}
s.segmentManager.DropSegmentsOfChannel(ctx, channel)
// clean up removal flag
s.meta.FinishRemoveChannel(channel)
// no compaction triggerred in Drop procedure
resp.Status.ErrorCode = commonpb.ErrorCode_Success
return resp, nil
}
// GetComponentStates returns DataCoord's current state
func (s *Server) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
nodeID := common.NotRegisteredID
......
......@@ -675,3 +675,21 @@ func (c *Client) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateR
}
return ret.(*milvuspb.GetFlushStateResponse), err
}
// DropVirtualChannel drops virtual channel in datacoord.
func (c *Client) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
ret, err := c.recall(func() (interface{}, error) {
client, err := c.getGrpcClient()
if err != nil {
return nil, err
}
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return client.DropVirtualChannel(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*datapb.DropVirtualChannelResponse), err
}
......@@ -116,6 +116,11 @@ func (m *MockDataCoordClient) WatchChannels(ctx context.Context, req *datapb.Wat
func (m *MockDataCoordClient) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest, opts ...grpc.CallOption) (*milvuspb.GetFlushStateResponse, error) {
return &milvuspb.GetFlushStateResponse{}, m.err
}
func (m *MockDataCoordClient) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest, opts ...grpc.CallOption) (*datapb.DropVirtualChannelResponse, error) {
return &datapb.DropVirtualChannelResponse{}, m.err
}
func Test_NewClient(t *testing.T) {
proxy.Params.InitOnce()
......@@ -203,6 +208,9 @@ func Test_NewClient(t *testing.T) {
r20, err := client.WatchChannels(ctx, nil)
retCheck(retNotNil, r20, err)
r21, err := client.DropVirtualChannel(ctx, nil)
retCheck(retNotNil, r21, err)
}
client.getGrpcClient = func() (datapb.DataCoordClient, error) {
......
......@@ -305,3 +305,8 @@ func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsReq
func (s *Server) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
return s.dataCoord.GetFlushState(ctx, req)
}
// DropVirtualChannel drop virtual channel in datacoord
func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
return s.dataCoord.DropVirtualChannel(ctx, req)
}
......@@ -53,6 +53,7 @@ type MockDataCoord struct {
compactionPlansResp *milvuspb.GetCompactionPlansResponse
watchChannelsResp *datapb.WatchChannelsResponse
getFlushStateResp *milvuspb.GetFlushStateResponse
dropVChanResp *datapb.DropVirtualChannelResponse
}
func (m *MockDataCoord) Init() error {
......@@ -155,6 +156,10 @@ func (m *MockDataCoord) GetFlushState(ctx context.Context, req *milvuspb.GetFlus
return m.getFlushStateResp, m.err
}
func (m *MockDataCoord) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
return m.dropVChanResp, m.err
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func Test_NewServer(t *testing.T) {
ctx := context.Background()
......@@ -321,6 +326,15 @@ func Test_NewServer(t *testing.T) {
assert.NotNil(t, resp)
})
t.Run("DropVirtualChannel", func(t *testing.T) {
server.dataCoord = &MockDataCoord{
dropVChanResp: &datapb.DropVirtualChannelResponse{},
}
resp, err := server.DropVirtualChannel(ctx, nil)
assert.Nil(t, err)
assert.NotNil(t, resp)
})
err = server.Stop()
assert.Nil(t, err)
}
......
......@@ -403,6 +403,10 @@ func (m *MockDataCoord) GetFlushState(ctx context.Context, req *milvuspb.GetFlus
return nil, nil
}
func (m *MockDataCoord) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
return &datapb.DropVirtualChannelResponse{}, nil
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type MockProxy struct {
MockBase
......
......@@ -40,6 +40,7 @@ service DataCoord {
rpc WatchChannels(WatchChannelsRequest) returns (WatchChannelsResponse) {}
rpc GetFlushState(milvus.GetFlushStateRequest) returns (milvus.GetFlushStateResponse) {}
rpc DropVirtualChannel(DropVirtualChannelRequest) returns (DropVirtualChannelResponse) {}
}
service DataNode {
......@@ -364,3 +365,24 @@ message WatchChannelsRequest {
message WatchChannelsResponse {
common.Status status = 1;
}
message DropVirtualChannelRequest {
common.MsgBase base = 1;
string channel_name = 2;
repeated DropVirtualChannelSegment segments = 3;
}
message DropVirtualChannelSegment {
int64 segmentID = 1;
int64 collectionID = 2;
repeated FieldBinlog field2BinlogPaths = 3;
repeated FieldBinlog field2StatslogPaths = 4;
repeated DeltaLogInfo deltalogs = 5;
internal.MsgPosition startPosition = 6;
internal.MsgPosition checkPoint = 7;
int64 numOfRows = 8;
}
message DropVirtualChannelResponse {
common.Status status = 1;
}
......@@ -202,6 +202,10 @@ func (coord *DataCoordMock) GetFlushState(ctx context.Context, req *milvuspb.Get
return &milvuspb.GetFlushStateResponse{}, nil
}
func (coord *DataCoordMock) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
return &datapb.DropVirtualChannelResponse{}, nil
}
func NewDataCoordMock() *DataCoordMock {
return &DataCoordMock{
nodeID: typeutil.UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()),
......
......@@ -199,7 +199,7 @@ type DataCoord interface {
// and related message stream positions
//
// ctx is the context to control request deadline and cancellation
// req contains the collection/partition id to query
// req contains the segment binlogs and checkpoint informations.
//
// response status contains the status/error code and failing reason if any
// error is returned only when some communication issue occurs
......@@ -232,6 +232,17 @@ type DataCoord interface {
WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error)
// GetFlushState gets the flush state of multiple segments
GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error)
// DropVirtualChannel notifies DataCoord a virtual channel is dropped and
// updates related segments binlogs(including insert binlogs, stats logs and delta logs)
// and related message stream positions
//
// ctx is the context to control request deadline and cancellation
// req contains the dropped virtual channel name and related segment information
//
// response status contains the status/error code and failing reason if any
// error is returned only when some communication issue occurs
DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error)
}
// IndexNode is the interface `indexnode` package implements
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册