未验证 提交 1e6f6bd0 编写于 作者: T Ten Thousand Leaves 提交者: GitHub

Implement set segment state call from RootCoord to DataCoord. (#16374)

/kind feature

issue: #15604
Signed-off-by: NYuchen Gao <yuchen.gao@zilliz.com>
上级 78200009
......@@ -93,8 +93,8 @@ var _ types.DataCoord = (*Server)(nil)
var Params paramtable.ComponentParam
// Server implements `types.Datacoord`
// handles Data Cooridinator related jobs
// Server implements `types.DataCoord`
// handles Data Coordinator related jobs
type Server struct {
ctx context.Context
serverLoopCtx context.Context
......
......@@ -31,6 +31,7 @@ import (
"time"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
......@@ -2261,6 +2262,116 @@ func TestGetFlushState(t *testing.T) {
})
}
type mockTxnKVext struct {
kv.MockTxnKV
}
func (m *mockTxnKVext) LoadWithPrefix(prefix string) ([]string, []string, error) {
return []string{}, []string{}, nil
}
func (m *mockTxnKVext) MultiSave(kvs map[string]string) error {
return errors.New("(testing only) injected error")
}
func TestDataCoordServer_SetSegmentState(t *testing.T) {
t.Run("normal case", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
segment := &datapb.SegmentInfo{
ID: 1000,
CollectionID: 100,
PartitionID: 0,
InsertChannel: "c1",
NumOfRows: 0,
State: commonpb.SegmentState_Growing,
StartPosition: &internalpb.MsgPosition{
ChannelName: "c1",
MsgID: []byte{},
MsgGroup: "",
Timestamp: 0,
},
}
err := svr.meta.AddSegment(NewSegmentInfo(segment))
assert.Nil(t, err)
// Set segment state.
svr.SetSegmentState(context.TODO(), &datapb.SetSegmentStateRequest{
SegmentId: 1000,
NewState: commonpb.SegmentState_Flushed,
})
// Verify that the state has been updated.
resp, err := svr.GetSegmentStates(context.TODO(), &datapb.GetSegmentStatesRequest{
Base: &commonpb.MsgBase{
MsgType: 0,
MsgID: 0,
Timestamp: 0,
SourceID: 0,
},
SegmentIDs: []int64{1000},
})
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.EqualValues(t, 1, len(resp.States))
assert.EqualValues(t, commonpb.SegmentState_Flushed, resp.States[0].State)
})
t.Run("dataCoord meta set state error", func(t *testing.T) {
svr := newTestServer(t, nil)
svr.meta.Lock()
func() {
defer svr.meta.Unlock()
svr.meta, _ = newMeta(&mockTxnKVext{})
}()
defer closeTestServer(t, svr)
segment := &datapb.SegmentInfo{
ID: 1000,
CollectionID: 100,
PartitionID: 0,
InsertChannel: "c1",
NumOfRows: 0,
State: commonpb.SegmentState_Growing,
StartPosition: &internalpb.MsgPosition{
ChannelName: "c1",
MsgID: []byte{},
MsgGroup: "",
Timestamp: 0,
},
}
svr.meta.AddSegment(NewSegmentInfo(segment))
// Set segment state.
svr.SetSegmentState(context.TODO(), &datapb.SetSegmentStateRequest{
SegmentId: 1000,
NewState: commonpb.SegmentState_Flushed,
})
// Verify that the state has been updated.
resp, err := svr.GetSegmentStates(context.TODO(), &datapb.GetSegmentStatesRequest{
Base: &commonpb.MsgBase{
MsgType: 0,
MsgID: 0,
Timestamp: 0,
SourceID: 0,
},
SegmentIDs: []int64{1000},
})
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.EqualValues(t, 1, len(resp.States))
assert.EqualValues(t, commonpb.SegmentState_Flushed, resp.States[0].State)
})
t.Run("with closed server", func(t *testing.T) {
svr := newTestServer(t, nil)
closeTestServer(t, svr)
resp, err := svr.SetSegmentState(context.TODO(), &datapb.SetSegmentStateRequest{
SegmentId: 1000,
NewState: commonpb.SegmentState_Flushed,
})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason())
})
}
func TestImport(t *testing.T) {
t.Run("normal case", func(t *testing.T) {
svr := newTestServer(t, nil)
......@@ -2279,9 +2390,7 @@ func TestImport(t *testing.T) {
})
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.GetErrorCode())
etcd.StopEtcdServer()
})
t.Run("no free node", func(t *testing.T) {
......@@ -2302,9 +2411,7 @@ func TestImport(t *testing.T) {
})
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.GetErrorCode())
etcd.StopEtcdServer()
})
t.Run("no datanode available", func(t *testing.T) {
......@@ -2319,9 +2426,7 @@ func TestImport(t *testing.T) {
})
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_UnexpectedError, resp.Status.GetErrorCode())
etcd.StopEtcdServer()
})
t.Run("with closed server", func(t *testing.T) {
......
......@@ -446,6 +446,35 @@ func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual
return resp, nil
}
// SetSegmentState reset the state of the given segment.
func (s *Server) SetSegmentState(ctx context.Context, req *datapb.SetSegmentStateRequest) (*datapb.SetSegmentStateResponse, error) {
if s.isClosed() {
return &datapb.SetSegmentStateResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: serverNotServingErrMsg,
},
}, nil
}
err := s.meta.SetState(req.GetSegmentId(), req.GetNewState())
if err != nil {
log.Error("failed to updated segment state in dataCoord meta",
zap.Int64("segment ID", req.SegmentId),
zap.String("to state", req.GetNewState().String()))
return &datapb.SetSegmentStateResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
}, nil
}
return &datapb.SetSegmentStateResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
}, nil
}
// GetComponentStates returns DataCoord's current state
func (s *Server) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
nodeID := common.NotRegisteredID
......
......@@ -487,6 +487,20 @@ func (c *Client) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual
return ret.(*datapb.DropVirtualChannelResponse), err
}
// SetSegmentState sets the state of a given segment.
func (c *Client) SetSegmentState(ctx context.Context, req *datapb.SetSegmentStateRequest) (*datapb.SetSegmentStateResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return client.(datapb.DataCoordClient).SetSegmentState(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*datapb.SetSegmentStateResponse), err
}
// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
func (c *Client) Import(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
......
......@@ -325,6 +325,11 @@ func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual
return s.dataCoord.DropVirtualChannel(ctx, req)
}
// SetSegmentState sets the state of a segment.
func (s *Server) SetSegmentState(ctx context.Context, req *datapb.SetSegmentStateRequest) (*datapb.SetSegmentStateResponse, error) {
return s.dataCoord.SetSegmentState(ctx, req)
}
// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
func (s *Server) Import(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) {
return s.dataCoord.Import(ctx, req)
......
......@@ -55,6 +55,7 @@ type MockDataCoord struct {
watchChannelsResp *datapb.WatchChannelsResponse
getFlushStateResp *milvuspb.GetFlushStateResponse
dropVChanResp *datapb.DropVirtualChannelResponse
setSegmentStateResp *datapb.SetSegmentStateResponse
importResp *datapb.ImportTaskResponse
}
......@@ -165,6 +166,10 @@ func (m *MockDataCoord) DropVirtualChannel(ctx context.Context, req *datapb.Drop
return m.dropVChanResp, m.err
}
func (m *MockDataCoord) SetSegmentState(ctx context.Context, req *datapb.SetSegmentStateRequest) (*datapb.SetSegmentStateResponse, error) {
return m.setSegmentStateResp, m.err
}
func (m *MockDataCoord) Import(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) {
return m.importResp, m.err
}
......@@ -379,6 +384,15 @@ func Test_NewServer(t *testing.T) {
assert.NotNil(t, resp)
})
t.Run("set segment state", func(t *testing.T) {
server.dataCoord = &MockDataCoord{
setSegmentStateResp: &datapb.SetSegmentStateResponse{},
}
resp, err := server.SetSegmentState(ctx, nil)
assert.Nil(t, err)
assert.NotNil(t, resp)
})
t.Run("Import", func(t *testing.T) {
server.dataCoord = &MockDataCoord{
importResp: &datapb.ImportTaskResponse{
......
......@@ -470,6 +470,10 @@ func (m *MockDataCoord) DropVirtualChannel(ctx context.Context, req *datapb.Drop
return &datapb.DropVirtualChannelResponse{}, nil
}
func (m *MockDataCoord) SetSegmentState(ctx context.Context, req *datapb.SetSegmentStateRequest) (*datapb.SetSegmentStateResponse, error) {
return &datapb.SetSegmentStateResponse{}, nil
}
func (m *MockDataCoord) Import(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) {
return nil, nil
}
......
......@@ -164,16 +164,20 @@ func TestGrpcService(t *testing.T) {
return nil
}
segs := []typeutil.UniqueID{}
var segs []typeutil.UniqueID
segLock := sync.Mutex{}
core.CallGetFlushedSegmentsService = func(ctx context.Context, collID, partID typeutil.UniqueID) ([]typeutil.UniqueID, error) {
segLock.Lock()
defer segLock.Unlock()
ret := []typeutil.UniqueID{}
var ret []typeutil.UniqueID
ret = append(ret, segs...)
return ret, nil
}
core.CallUpdateSegmentStateService = func(ctx context.Context, segID typeutil.UniqueID, ss commonpb.SegmentState) error {
return nil
}
var binlogLock sync.Mutex
binlogPathArray := make([]string, 0, 16)
core.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, numRows int64) (typeutil.UniqueID, error) {
......
......@@ -24,67 +24,67 @@ import (
"go.uber.org/zap"
)
type mockBaseKV struct {
type MockBaseKV struct {
InMemKv map[string]string
}
func (m *mockBaseKV) Load(key string) (string, error) {
func (m *MockBaseKV) Load(key string) (string, error) {
if val, ok := m.InMemKv[key]; ok {
return val, nil
}
return "", nil
}
func (m *mockBaseKV) MultiLoad(keys []string) ([]string, error) {
func (m *MockBaseKV) MultiLoad(keys []string) ([]string, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockBaseKV) LoadWithPrefix(key string) ([]string, []string, error) {
func (m *MockBaseKV) LoadWithPrefix(key string) ([]string, []string, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockBaseKV) Save(key string, value string) error {
func (m *MockBaseKV) Save(key string, value string) error {
panic("not implemented") // TODO: Implement
}
func (m *mockBaseKV) MultiSave(kvs map[string]string) error {
func (m *MockBaseKV) MultiSave(kvs map[string]string) error {
panic("not implemented") // TODO: Implement
}
func (m *mockBaseKV) Remove(key string) error {
func (m *MockBaseKV) Remove(key string) error {
panic("not implemented") // TODO: Implement
}
func (m *mockBaseKV) MultiRemove(keys []string) error {
func (m *MockBaseKV) MultiRemove(keys []string) error {
panic("not implemented") // TODO: Implement
}
func (m *mockBaseKV) RemoveWithPrefix(key string) error {
func (m *MockBaseKV) RemoveWithPrefix(key string) error {
panic("not implemented") // TODO: Implement
}
func (m *mockBaseKV) Close() {
func (m *MockBaseKV) Close() {
panic("not implemented") // TODO: Implement
}
type mockTxnKV struct {
mockBaseKV
type MockTxnKV struct {
MockBaseKV
}
func (m *mockTxnKV) MultiSaveAndRemove(saves map[string]string, removals []string) error {
func (m *MockTxnKV) MultiSaveAndRemove(saves map[string]string, removals []string) error {
panic("not implemented") // TODO: Implement
}
func (m *mockTxnKV) MultiRemoveWithPrefix(keys []string) error {
func (m *MockTxnKV) MultiRemoveWithPrefix(keys []string) error {
panic("not implemented") // TODO: Implement
}
func (m *mockTxnKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
func (m *MockTxnKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
panic("not implemented") // TODO: Implement
}
type MockMetaKV struct {
mockTxnKV
MockTxnKV
}
func (m *MockMetaKV) GetPath(key string) string {
......
......@@ -42,6 +42,7 @@ service DataCoord {
rpc GetFlushState(milvus.GetFlushStateRequest) returns (milvus.GetFlushStateResponse) {}
rpc DropVirtualChannel(DropVirtualChannelRequest) returns (DropVirtualChannelResponse) {}
rpc SetSegmentState(SetSegmentStateRequest) returns (SetSegmentStateResponse) {}
// https://wiki.lfaidata.foundation/display/MIL/MEP+24+--+Support+bulk+load
rpc Import(ImportTaskRequest) returns (ImportTaskResponse) {}
}
......@@ -397,6 +398,16 @@ message WatchChannelsResponse {
common.Status status = 1;
}
message SetSegmentStateRequest {
common.MsgBase base = 1;
int64 segment_id = 2;
common.SegmentState new_state = 3;
}
message SetSegmentStateResponse {
common.Status status = 1;
}
message DropVirtualChannelRequest {
common.MsgBase base = 1;
string channel_name = 2;
......
......@@ -206,6 +206,10 @@ func (coord *DataCoordMock) DropVirtualChannel(ctx context.Context, req *datapb.
return &datapb.DropVirtualChannelResponse{}, nil
}
func (coord *DataCoordMock) SetSegmentState(ctx context.Context, req *datapb.SetSegmentStateRequest) (*datapb.SetSegmentStateResponse, error) {
return &datapb.SetSegmentStateResponse{}, nil
}
func (coord *DataCoordMock) Import(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) {
return &datapb.ImportTaskResponse{}, nil
}
......
......@@ -458,12 +458,6 @@ func (m *importManager) updateImportTaskStore(ti *datapb.ImportTaskInfo) error {
return nil
}
// bringSegmentsOnline brings the segments online so that data in these segments become searchable.
func (m *importManager) bringSegmentsOnline(ti *datapb.ImportTaskInfo) {
log.Info("Bringing import tasks segments online!", zap.Int64("Task ID", ti.GetId()))
// TODO: Implement it.
}
// expireOldTasksLoop starts a loop that checks and expires old tasks every `ImportTaskExpiration` seconds.
func (m *importManager) expireOldTasksLoop(wg *sync.WaitGroup) {
defer wg.Done()
......@@ -476,7 +470,8 @@ func (m *importManager) expireOldTasksLoop(wg *sync.WaitGroup) {
return
case <-ticker.C:
log.Info("(in loop) starting expiring old tasks...",
zap.Any("cleaning up interval", time.Duration(Params.RootCoordCfg.ImportTaskExpiration)))
zap.Duration("cleaning up interval",
time.Duration(Params.RootCoordCfg.ImportTaskExpiration*1000)*time.Millisecond))
m.expireOldTasks()
}
}
......@@ -490,7 +485,7 @@ func (m *importManager) expireOldTasks() {
defer m.pendingLock.Unlock()
for _, t := range m.pendingTasks {
if taskExpired(t) {
log.Info("a pending task has expired", zap.Any("task info", t))
log.Info("a pending task has expired", zap.Int64("task ID", t.GetId()))
t.State.StateCode = commonpb.ImportState_ImportFailed
t.State.ErrorMessage = taskExpiredMsgPrefix +
(time.Duration(Params.RootCoordCfg.ImportTaskExpiration*1000) * time.Millisecond).String()
......@@ -505,7 +500,7 @@ func (m *importManager) expireOldTasks() {
for _, v := range m.workingTasks {
// Mark this expired task as failed.
if taskExpired(v) {
log.Info("a working task has expired", zap.Any("task info", v))
log.Info("a working task has expired", zap.Int64("task ID", v.GetId()))
v.State.StateCode = commonpb.ImportState_ImportFailed
v.State.ErrorMessage = taskExpiredMsgPrefix +
(time.Duration(Params.RootCoordCfg.ImportTaskExpiration*1000) * time.Millisecond).String()
......
......@@ -147,6 +147,9 @@ type Core struct {
CallWatchChannels func(ctx context.Context, collectionID int64, channelNames []string) error
// Update segment state.
CallUpdateSegmentStateService func(ctx context.Context, segID typeutil.UniqueID, ss commonpb.SegmentState) error
//assign import task to data service
CallImportService func(ctx context.Context, req *datapb.ImportTaskRequest) *datapb.ImportTaskResponse
......@@ -274,6 +277,9 @@ func (c *Core) checkInit() error {
if c.CallGetFlushedSegmentsService == nil {
return fmt.Errorf("callGetFlushedSegmentsService is nil")
}
if c.CallUpdateSegmentStateService == nil {
return fmt.Errorf("CallUpdateSegmentStateService is nil")
}
if c.CallWatchChannels == nil {
return fmt.Errorf("callWatchChannels is nil")
}
......@@ -597,7 +603,7 @@ func (c *Core) SetNewProxyClient(f func(sess *sessionutil.Session) (types.Proxy,
}
}
// SetDataCoord set datacoord
// SetDataCoord set dataCoord.
func (c *Core) SetDataCoord(ctx context.Context, s types.DataCoord) error {
initCh := make(chan struct{})
go func() {
......@@ -714,6 +720,29 @@ func (c *Core) SetDataCoord(ctx context.Context, s types.DataCoord) error {
return rsp.Segments, nil
}
c.CallUpdateSegmentStateService = func(ctx context.Context, segID typeutil.UniqueID, ss commonpb.SegmentState) (retErr error) {
defer func() {
if err := recover(); err != nil {
retErr = fmt.Errorf("update segment state from data coord panic, msg = %v", err)
}
}()
<-initCh
req := &datapb.SetSegmentStateRequest{
SegmentId: segID,
NewState: ss,
}
resp, err := s.SetSegmentState(ctx, req)
if err != nil || resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Error("failed to update segment state",
zap.Any("request", req), zap.Any("response", resp), zap.Error(err))
return err
}
log.Info("successfully set segment state",
zap.Int64("segment ID", req.GetSegmentId()),
zap.String("new segment state", req.GetNewState().String()))
return nil
}
c.CallWatchChannels = func(ctx context.Context, collectionID int64, channelNames []string) (retErr error) {
defer func() {
if err := recover(); err != nil {
......@@ -2263,13 +2292,14 @@ func (c *Core) GetImportState(ctx context.Context, req *milvuspb.GetImportStateR
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]),
}, nil
}
return c.importManager.getTaskState(req.Task), nil
return c.importManager.getTaskState(req.GetTask()), nil
}
// ReportImport reports import task state to RootCoord.
func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) (*commonpb.Status, error) {
log.Info("receive import state report", zap.Any("import result", ir))
log.Info("receive import state report",
zap.Int64("task ID", ir.GetTaskId()),
zap.Any("import state", ir.GetState()))
if code, ok := c.checkHealthy(); !ok {
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
}
......@@ -2428,30 +2458,41 @@ func (c *Core) checkCompleteIndexLoop(ctx context.Context, ti *datapb.ImportTask
for {
select {
case <-c.ctx.Done():
log.Info("(in loop)context done, exiting checkCompleteIndexLoop",
log.Info("(in check complete index loop) context done, exiting checkCompleteIndexLoop",
zap.Int64("task ID", ti.GetId()))
return
case <-ticker.C:
log.Info("(in loop)check segments' index states", zap.Int64("task ID", ti.GetId()))
log.Info("(in check complete index loop) check segments' index states", zap.Int64("task ID", ti.GetId()))
if ct, err := c.CountCompleteIndex(ctx, colName, ti.GetCollectionId(), segIDs); err == nil &&
segmentsOnlineReady(ct, len(segIDs)) {
log.Info("(in loop)segment indices are ready",
log.Info("segment indices are ready",
zap.Int64("task ID", ti.GetId()),
zap.Int("total # of segments", len(segIDs)),
zap.Int("# of segments with index ready", ct))
c.importManager.bringSegmentsOnline(ti)
c.bringSegmentsOnline(ctx, segIDs)
return
}
case <-expireTicker.C:
log.Info("(in loop)waited for sufficiently long time, bring segments online",
log.Info("(in check complete index loop) waited for sufficiently long time, bring segments online",
zap.Int64("task ID", ti.GetId()))
c.importManager.bringSegmentsOnline(ti)
c.bringSegmentsOnline(ctx, segIDs)
return
}
}
}
// segmentsOnlineReady returns true if segments are ready to go up online (a.k.a. searchable).
// bringSegmentsOnline brings the segments online so that data in these segments become searchable
// it is done by changing segments' states from `importing` to `flushed`.
func (c *Core) bringSegmentsOnline(ctx context.Context, segIDs []UniqueID) {
log.Info("bringing import task's segments online!", zap.Any("segment IDs", segIDs))
// TODO: Make update on segment states atomic.
for _, id := range segIDs {
// Explicitly mark segment states `flushed`.
c.CallUpdateSegmentStateService(ctx, id, commonpb.SegmentState_Flushed)
}
}
// segmentsOnlineReady returns true if segments are ready to go up online (a.k.a. become searchable).
func segmentsOnlineReady(idxBuilt, segCount int) bool {
// Consider segments are ready when:
// (1) all but up to 2 segments have indices ready, or
......
......@@ -255,6 +255,8 @@ type DataCoord interface {
WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error)
// GetFlushState gets the flush state of multiple segments
GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error)
// SetSegmentState updates a segment's state explicitly.
SetSegmentState(ctx context.Context, req *datapb.SetSegmentStateRequest) (*datapb.SetSegmentStateResponse, error)
// DropVirtualChannel notifies DataCoord a virtual channel is dropped and
// updates related segments binlogs(including insert binlogs, stats logs and delta logs)
......
......@@ -119,6 +119,10 @@ func (m *DataCoordClient) DropVirtualChannel(ctx context.Context, req *datapb.Dr
return &datapb.DropVirtualChannelResponse{}, m.Err
}
func (m *DataCoordClient) SetSegmentState(ctx context.Context, req *datapb.SetSegmentStateRequest, opts ...grpc.CallOption) (*datapb.SetSegmentStateResponse, error) {
return &datapb.SetSegmentStateResponse{}, m.Err
}
func (m *DataCoordClient) Import(ctx context.Context, req *datapb.ImportTaskRequest, opts ...grpc.CallOption) (*datapb.ImportTaskResponse, error) {
return &datapb.ImportTaskResponse{}, m.Err
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册