未验证 提交 9fd9bed2 编写于 作者: J Jiquan Long 提交者: GitHub

Remove collection meta after GC finished (#21595)

Signed-off-by: Nlongjiquan <jiquan.long@zilliz.com>
上级 6d09bbed
......@@ -297,6 +297,9 @@ mock-datanode:
mock-rootcoord:
mockery --name=RootCoord --dir=$(PWD)/internal/types --output=$(PWD)/internal/mocks --filename=mock_rootcoord.go --with-expecter
mock-datacoord:
mockery --name=DataCoord --dir=$(PWD)/internal/types --output=$(PWD)/internal/mocks --filename=mock_datacoord.go --with-expecter
mock-tnx-kv:
mockery --name=TxnKV --dir=$(PWD)/internal/kv --output=$(PWD)/internal/kv/mocks --filename=TxnKV.go --with-expecter
......
......@@ -1269,6 +1269,10 @@ func (m *meta) DropChannelCheckpoint(vChannel string) error {
return nil
}
func (m *meta) GcConfirm(ctx context.Context, collectionID, partitionID UniqueID) bool {
return m.catalog.GcConfirm(ctx, collectionID, partitionID)
}
// addNewSeg update metrics update for a new segment.
func (s *segMetricMutation) addNewSeg(state commonpb.SegmentState, rowCount int64) {
s.stateChange[state.String()]++
......
......@@ -23,19 +23,18 @@ import (
"testing"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestMetaReloadFromKV(t *testing.T) {
......@@ -945,3 +944,17 @@ func TestChannelCP(t *testing.T) {
assert.NoError(t, err)
})
}
func Test_meta_GcConfirm(t *testing.T) {
m := &meta{}
catalog := mocks.NewDataCoordCatalog(t)
m.catalog = catalog
catalog.On("GcConfirm",
mock.Anything,
mock.AnythingOfType("int64"),
mock.AnythingOfType("int64")).
Return(false)
assert.False(t, m.GcConfirm(context.TODO(), 100, 10000))
}
......@@ -1443,3 +1443,21 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque
return &milvuspb.CheckHealthResponse{IsHealthy: true, Reasons: errReasons}, nil
}
func (s *Server) GcConfirm(ctx context.Context, request *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) {
resp := &datapb.GcConfirmResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
GcFinished: false,
}
if s.isClosed() {
resp.Status.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID())
return resp, nil
}
resp.GcFinished = s.meta.GcConfirm(ctx, request.GetCollectionId(), request.GetPartitionId())
resp.Status.ErrorCode = commonpb.ErrorCode_Success
return resp, nil
}
......@@ -4,6 +4,9 @@ import (
"context"
"testing"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
......@@ -54,3 +57,35 @@ func TestBroadcastAlteredCollection(t *testing.T) {
assert.NotNil(t, s.meta.collections[1].Properties)
})
}
func TestServer_GcConfirm(t *testing.T) {
t.Run("closed server", func(t *testing.T) {
s := &Server{}
s.stateCode.Store(commonpb.StateCode_Initializing)
resp, err := s.GcConfirm(context.TODO(), &datapb.GcConfirmRequest{CollectionId: 100, PartitionId: 10000})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("normal case", func(t *testing.T) {
s := &Server{}
s.stateCode.Store(commonpb.StateCode_Healthy)
m := &meta{}
catalog := mocks.NewDataCoordCatalog(t)
m.catalog = catalog
catalog.On("GcConfirm",
mock.Anything,
mock.AnythingOfType("int64"),
mock.AnythingOfType("int64")).
Return(false)
s.meta = m
resp, err := s.GcConfirm(context.TODO(), &datapb.GcConfirmRequest{CollectionId: 100, PartitionId: 10000})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
assert.False(t, resp.GetGcFinished())
})
}
......@@ -760,6 +760,19 @@ func (c *Client) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque
return ret.(*milvuspb.CheckHealthResponse), err
}
func (c *Client) GcConfirm(ctx context.Context, req *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return client.GcConfirm(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*datapb.GcConfirmResponse), err
}
// CreateIndex sends the build index request to IndexCoord.
func (c *Client) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
......
......@@ -386,6 +386,10 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque
return s.dataCoord.CheckHealth(ctx, req)
}
func (s *Server) GcConfirm(ctx context.Context, request *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) {
return s.dataCoord.GcConfirm(ctx, request)
}
// CreateIndex sends the build index request to DataCoord.
func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) {
return s.dataCoord.CreateIndex(ctx, req)
......
......@@ -21,12 +21,12 @@ import (
"errors"
"testing"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/stretchr/testify/assert"
clientv3 "go.etcd.io/etcd/client/v3"
......@@ -34,6 +34,8 @@ import (
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type MockDataCoord struct {
types.DataCoord
states *milvuspb.ComponentStates
status *commonpb.Status
err error
......
......@@ -554,6 +554,10 @@ func (m *MockDataCoord) CheckHealth(ctx context.Context, req *milvuspb.CheckHeal
return nil, nil
}
func (m *MockDataCoord) GcConfirm(ctx context.Context, req *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) {
return nil, nil
}
func (m *MockDataCoord) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) {
return nil, nil
}
......
......@@ -82,10 +82,10 @@ type MetaKv_CompareValueAndSwap_Call struct {
}
// CompareValueAndSwap is a helper method to define mock.On call
// - key string
// - value string
// - target string
// - opts ...clientv3.OpOption
// - key string
// - value string
// - target string
// - opts ...clientv3.OpOption
func (_e *MetaKv_Expecter) CompareValueAndSwap(key interface{}, value interface{}, target interface{}, opts ...interface{}) *MetaKv_CompareValueAndSwap_Call {
return &MetaKv_CompareValueAndSwap_Call{Call: _e.mock.On("CompareValueAndSwap",
append([]interface{}{key, value, target}, opts...)...)}
......@@ -143,10 +143,10 @@ type MetaKv_CompareVersionAndSwap_Call struct {
}
// CompareVersionAndSwap is a helper method to define mock.On call
// - key string
// - version int64
// - target string
// - opts ...clientv3.OpOption
// - key string
// - version int64
// - target string
// - opts ...clientv3.OpOption
func (_e *MetaKv_Expecter) CompareVersionAndSwap(key interface{}, version interface{}, target interface{}, opts ...interface{}) *MetaKv_CompareVersionAndSwap_Call {
return &MetaKv_CompareVersionAndSwap_Call{Call: _e.mock.On("CompareVersionAndSwap",
append([]interface{}{key, version, target}, opts...)...)}
......@@ -190,7 +190,7 @@ type MetaKv_GetPath_Call struct {
}
// GetPath is a helper method to define mock.On call
// - key string
// - key string
func (_e *MetaKv_Expecter) GetPath(key interface{}) *MetaKv_GetPath_Call {
return &MetaKv_GetPath_Call{Call: _e.mock.On("GetPath", key)}
}
......@@ -234,7 +234,7 @@ type MetaKv_Grant_Call struct {
}
// Grant is a helper method to define mock.On call
// - ttl int64
// - ttl int64
func (_e *MetaKv_Expecter) Grant(ttl interface{}) *MetaKv_Grant_Call {
return &MetaKv_Grant_Call{Call: _e.mock.On("Grant", ttl)}
}
......@@ -280,7 +280,7 @@ type MetaKv_KeepAlive_Call struct {
}
// KeepAlive is a helper method to define mock.On call
// - id clientv3.LeaseID
// - id clientv3.LeaseID
func (_e *MetaKv_Expecter) KeepAlive(id interface{}) *MetaKv_KeepAlive_Call {
return &MetaKv_KeepAlive_Call{Call: _e.mock.On("KeepAlive", id)}
}
......@@ -324,7 +324,7 @@ type MetaKv_Load_Call struct {
}
// Load is a helper method to define mock.On call
// - key string
// - key string
func (_e *MetaKv_Expecter) Load(key interface{}) *MetaKv_Load_Call {
return &MetaKv_Load_Call{Call: _e.mock.On("Load", key)}
}
......@@ -379,7 +379,7 @@ type MetaKv_LoadWithPrefix_Call struct {
}
// LoadWithPrefix is a helper method to define mock.On call
// - key string
// - key string
func (_e *MetaKv_Expecter) LoadWithPrefix(key interface{}) *MetaKv_LoadWithPrefix_Call {
return &MetaKv_LoadWithPrefix_Call{Call: _e.mock.On("LoadWithPrefix", key)}
}
......@@ -443,7 +443,7 @@ type MetaKv_LoadWithPrefix2_Call struct {
}
// LoadWithPrefix2 is a helper method to define mock.On call
// - key string
// - key string
func (_e *MetaKv_Expecter) LoadWithPrefix2(key interface{}) *MetaKv_LoadWithPrefix2_Call {
return &MetaKv_LoadWithPrefix2_Call{Call: _e.mock.On("LoadWithPrefix2", key)}
}
......@@ -505,7 +505,7 @@ type MetaKv_LoadWithRevision_Call struct {
}
// LoadWithRevision is a helper method to define mock.On call
// - key string
// - key string
func (_e *MetaKv_Expecter) LoadWithRevision(key interface{}) *MetaKv_LoadWithRevision_Call {
return &MetaKv_LoadWithRevision_Call{Call: _e.mock.On("LoadWithRevision", key)}
}
......@@ -576,7 +576,7 @@ type MetaKv_LoadWithRevisionAndVersions_Call struct {
}
// LoadWithRevisionAndVersions is a helper method to define mock.On call
// - key string
// - key string
func (_e *MetaKv_Expecter) LoadWithRevisionAndVersions(key interface{}) *MetaKv_LoadWithRevisionAndVersions_Call {
return &MetaKv_LoadWithRevisionAndVersions_Call{Call: _e.mock.On("LoadWithRevisionAndVersions", key)}
}
......@@ -622,7 +622,7 @@ type MetaKv_MultiLoad_Call struct {
}
// MultiLoad is a helper method to define mock.On call
// - keys []string
// - keys []string
func (_e *MetaKv_Expecter) MultiLoad(keys interface{}) *MetaKv_MultiLoad_Call {
return &MetaKv_MultiLoad_Call{Call: _e.mock.On("MultiLoad", keys)}
}
......@@ -659,7 +659,7 @@ type MetaKv_MultiRemove_Call struct {
}
// MultiRemove is a helper method to define mock.On call
// - keys []string
// - keys []string
func (_e *MetaKv_Expecter) MultiRemove(keys interface{}) *MetaKv_MultiRemove_Call {
return &MetaKv_MultiRemove_Call{Call: _e.mock.On("MultiRemove", keys)}
}
......@@ -696,7 +696,7 @@ type MetaKv_MultiRemoveWithPrefix_Call struct {
}
// MultiRemoveWithPrefix is a helper method to define mock.On call
// - keys []string
// - keys []string
func (_e *MetaKv_Expecter) MultiRemoveWithPrefix(keys interface{}) *MetaKv_MultiRemoveWithPrefix_Call {
return &MetaKv_MultiRemoveWithPrefix_Call{Call: _e.mock.On("MultiRemoveWithPrefix", keys)}
}
......@@ -733,7 +733,7 @@ type MetaKv_MultiSave_Call struct {
}
// MultiSave is a helper method to define mock.On call
// - kvs map[string]string
// - kvs map[string]string
func (_e *MetaKv_Expecter) MultiSave(kvs interface{}) *MetaKv_MultiSave_Call {
return &MetaKv_MultiSave_Call{Call: _e.mock.On("MultiSave", kvs)}
}
......@@ -770,8 +770,8 @@ type MetaKv_MultiSaveAndRemove_Call struct {
}
// MultiSaveAndRemove is a helper method to define mock.On call
// - saves map[string]string
// - removals []string
// - saves map[string]string
// - removals []string
func (_e *MetaKv_Expecter) MultiSaveAndRemove(saves interface{}, removals interface{}) *MetaKv_MultiSaveAndRemove_Call {
return &MetaKv_MultiSaveAndRemove_Call{Call: _e.mock.On("MultiSaveAndRemove", saves, removals)}
}
......@@ -808,8 +808,8 @@ type MetaKv_MultiSaveAndRemoveWithPrefix_Call struct {
}
// MultiSaveAndRemoveWithPrefix is a helper method to define mock.On call
// - saves map[string]string
// - removals []string
// - saves map[string]string
// - removals []string
func (_e *MetaKv_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}) *MetaKv_MultiSaveAndRemoveWithPrefix_Call {
return &MetaKv_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix", saves, removals)}
}
......@@ -846,7 +846,7 @@ type MetaKv_Remove_Call struct {
}
// Remove is a helper method to define mock.On call
// - key string
// - key string
func (_e *MetaKv_Expecter) Remove(key interface{}) *MetaKv_Remove_Call {
return &MetaKv_Remove_Call{Call: _e.mock.On("Remove", key)}
}
......@@ -883,7 +883,7 @@ type MetaKv_RemoveWithPrefix_Call struct {
}
// RemoveWithPrefix is a helper method to define mock.On call
// - key string
// - key string
func (_e *MetaKv_Expecter) RemoveWithPrefix(key interface{}) *MetaKv_RemoveWithPrefix_Call {
return &MetaKv_RemoveWithPrefix_Call{Call: _e.mock.On("RemoveWithPrefix", key)}
}
......@@ -920,8 +920,8 @@ type MetaKv_Save_Call struct {
}
// Save is a helper method to define mock.On call
// - key string
// - value string
// - key string
// - value string
func (_e *MetaKv_Expecter) Save(key interface{}, value interface{}) *MetaKv_Save_Call {
return &MetaKv_Save_Call{Call: _e.mock.On("Save", key, value)}
}
......@@ -958,8 +958,8 @@ type MetaKv_SaveWithIgnoreLease_Call struct {
}
// SaveWithIgnoreLease is a helper method to define mock.On call
// - key string
// - value string
// - key string
// - value string
func (_e *MetaKv_Expecter) SaveWithIgnoreLease(key interface{}, value interface{}) *MetaKv_SaveWithIgnoreLease_Call {
return &MetaKv_SaveWithIgnoreLease_Call{Call: _e.mock.On("SaveWithIgnoreLease", key, value)}
}
......@@ -996,9 +996,9 @@ type MetaKv_SaveWithLease_Call struct {
}
// SaveWithLease is a helper method to define mock.On call
// - key string
// - value string
// - id clientv3.LeaseID
// - key string
// - value string
// - id clientv3.LeaseID
func (_e *MetaKv_Expecter) SaveWithLease(key interface{}, value interface{}, id interface{}) *MetaKv_SaveWithLease_Call {
return &MetaKv_SaveWithLease_Call{Call: _e.mock.On("SaveWithLease", key, value, id)}
}
......@@ -1035,9 +1035,9 @@ type MetaKv_WalkWithPrefix_Call struct {
}
// WalkWithPrefix is a helper method to define mock.On call
// - prefix string
// - paginationSize int
// - fn func([]byte , []byte) error
// - prefix string
// - paginationSize int
// - fn func([]byte , []byte) error
func (_e *MetaKv_Expecter) WalkWithPrefix(prefix interface{}, paginationSize interface{}, fn interface{}) *MetaKv_WalkWithPrefix_Call {
return &MetaKv_WalkWithPrefix_Call{Call: _e.mock.On("WalkWithPrefix", prefix, paginationSize, fn)}
}
......@@ -1076,7 +1076,7 @@ type MetaKv_Watch_Call struct {
}
// Watch is a helper method to define mock.On call
// - key string
// - key string
func (_e *MetaKv_Expecter) Watch(key interface{}) *MetaKv_Watch_Call {
return &MetaKv_Watch_Call{Call: _e.mock.On("Watch", key)}
}
......@@ -1115,7 +1115,7 @@ type MetaKv_WatchWithPrefix_Call struct {
}
// WatchWithPrefix is a helper method to define mock.On call
// - key string
// - key string
func (_e *MetaKv_Expecter) WatchWithPrefix(key interface{}) *MetaKv_WatchWithPrefix_Call {
return &MetaKv_WatchWithPrefix_Call{Call: _e.mock.On("WatchWithPrefix", key)}
}
......@@ -1154,8 +1154,8 @@ type MetaKv_WatchWithRevision_Call struct {
}
// WatchWithRevision is a helper method to define mock.On call
// - key string
// - revision int64
// - key string
// - revision int64
func (_e *MetaKv_Expecter) WatchWithRevision(key interface{}, revision interface{}) *MetaKv_WatchWithRevision_Call {
return &MetaKv_WatchWithRevision_Call{Call: _e.mock.On("WatchWithRevision", key, revision)}
}
......
......@@ -44,8 +44,8 @@ type SnapShotKV_Load_Call struct {
}
// Load is a helper method to define mock.On call
// - key string
// - ts uint64
// - key string
// - ts uint64
func (_e *SnapShotKV_Expecter) Load(key interface{}, ts interface{}) *SnapShotKV_Load_Call {
return &SnapShotKV_Load_Call{Call: _e.mock.On("Load", key, ts)}
}
......@@ -100,8 +100,8 @@ type SnapShotKV_LoadWithPrefix_Call struct {
}
// LoadWithPrefix is a helper method to define mock.On call
// - key string
// - ts uint64
// - key string
// - ts uint64
func (_e *SnapShotKV_Expecter) LoadWithPrefix(key interface{}, ts interface{}) *SnapShotKV_LoadWithPrefix_Call {
return &SnapShotKV_LoadWithPrefix_Call{Call: _e.mock.On("LoadWithPrefix", key, ts)}
}
......@@ -138,8 +138,8 @@ type SnapShotKV_MultiSave_Call struct {
}
// MultiSave is a helper method to define mock.On call
// - kvs map[string]string
// - ts uint64
// - kvs map[string]string
// - ts uint64
func (_e *SnapShotKV_Expecter) MultiSave(kvs interface{}, ts interface{}) *SnapShotKV_MultiSave_Call {
return &SnapShotKV_MultiSave_Call{Call: _e.mock.On("MultiSave", kvs, ts)}
}
......@@ -176,9 +176,9 @@ type SnapShotKV_MultiSaveAndRemoveWithPrefix_Call struct {
}
// MultiSaveAndRemoveWithPrefix is a helper method to define mock.On call
// - saves map[string]string
// - removals []string
// - ts uint64
// - saves map[string]string
// - removals []string
// - ts uint64
func (_e *SnapShotKV_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}, ts interface{}) *SnapShotKV_MultiSaveAndRemoveWithPrefix_Call {
return &SnapShotKV_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix", saves, removals, ts)}
}
......@@ -215,9 +215,9 @@ type SnapShotKV_Save_Call struct {
}
// Save is a helper method to define mock.On call
// - key string
// - value string
// - ts uint64
// - key string
// - value string
// - ts uint64
func (_e *SnapShotKV_Expecter) Save(key interface{}, value interface{}, ts interface{}) *SnapShotKV_Save_Call {
return &SnapShotKV_Save_Call{Call: _e.mock.On("Save", key, value, ts)}
}
......
......@@ -71,7 +71,7 @@ type TxnKV_Load_Call struct {
}
// Load is a helper method to define mock.On call
// - key string
// - key string
func (_e *TxnKV_Expecter) Load(key interface{}) *TxnKV_Load_Call {
return &TxnKV_Load_Call{Call: _e.mock.On("Load", key)}
}
......@@ -126,7 +126,7 @@ type TxnKV_LoadWithPrefix_Call struct {
}
// LoadWithPrefix is a helper method to define mock.On call
// - key string
// - key string
func (_e *TxnKV_Expecter) LoadWithPrefix(key interface{}) *TxnKV_LoadWithPrefix_Call {
return &TxnKV_LoadWithPrefix_Call{Call: _e.mock.On("LoadWithPrefix", key)}
}
......@@ -172,7 +172,7 @@ type TxnKV_MultiLoad_Call struct {
}
// MultiLoad is a helper method to define mock.On call
// - keys []string
// - keys []string
func (_e *TxnKV_Expecter) MultiLoad(keys interface{}) *TxnKV_MultiLoad_Call {
return &TxnKV_MultiLoad_Call{Call: _e.mock.On("MultiLoad", keys)}
}
......@@ -209,7 +209,7 @@ type TxnKV_MultiRemove_Call struct {
}
// MultiRemove is a helper method to define mock.On call
// - keys []string
// - keys []string
func (_e *TxnKV_Expecter) MultiRemove(keys interface{}) *TxnKV_MultiRemove_Call {
return &TxnKV_MultiRemove_Call{Call: _e.mock.On("MultiRemove", keys)}
}
......@@ -246,7 +246,7 @@ type TxnKV_MultiRemoveWithPrefix_Call struct {
}
// MultiRemoveWithPrefix is a helper method to define mock.On call
// - keys []string
// - keys []string
func (_e *TxnKV_Expecter) MultiRemoveWithPrefix(keys interface{}) *TxnKV_MultiRemoveWithPrefix_Call {
return &TxnKV_MultiRemoveWithPrefix_Call{Call: _e.mock.On("MultiRemoveWithPrefix", keys)}
}
......@@ -283,7 +283,7 @@ type TxnKV_MultiSave_Call struct {
}
// MultiSave is a helper method to define mock.On call
// - kvs map[string]string
// - kvs map[string]string
func (_e *TxnKV_Expecter) MultiSave(kvs interface{}) *TxnKV_MultiSave_Call {
return &TxnKV_MultiSave_Call{Call: _e.mock.On("MultiSave", kvs)}
}
......@@ -320,8 +320,8 @@ type TxnKV_MultiSaveAndRemove_Call struct {
}
// MultiSaveAndRemove is a helper method to define mock.On call
// - saves map[string]string
// - removals []string
// - saves map[string]string
// - removals []string
func (_e *TxnKV_Expecter) MultiSaveAndRemove(saves interface{}, removals interface{}) *TxnKV_MultiSaveAndRemove_Call {
return &TxnKV_MultiSaveAndRemove_Call{Call: _e.mock.On("MultiSaveAndRemove", saves, removals)}
}
......@@ -358,8 +358,8 @@ type TxnKV_MultiSaveAndRemoveWithPrefix_Call struct {
}
// MultiSaveAndRemoveWithPrefix is a helper method to define mock.On call
// - saves map[string]string
// - removals []string
// - saves map[string]string
// - removals []string
func (_e *TxnKV_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}) *TxnKV_MultiSaveAndRemoveWithPrefix_Call {
return &TxnKV_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix", saves, removals)}
}
......@@ -396,7 +396,7 @@ type TxnKV_Remove_Call struct {
}
// Remove is a helper method to define mock.On call
// - key string
// - key string
func (_e *TxnKV_Expecter) Remove(key interface{}) *TxnKV_Remove_Call {
return &TxnKV_Remove_Call{Call: _e.mock.On("Remove", key)}
}
......@@ -433,7 +433,7 @@ type TxnKV_RemoveWithPrefix_Call struct {
}
// RemoveWithPrefix is a helper method to define mock.On call
// - key string
// - key string
func (_e *TxnKV_Expecter) RemoveWithPrefix(key interface{}) *TxnKV_RemoveWithPrefix_Call {
return &TxnKV_RemoveWithPrefix_Call{Call: _e.mock.On("RemoveWithPrefix", key)}
}
......@@ -470,8 +470,8 @@ type TxnKV_Save_Call struct {
}
// Save is a helper method to define mock.On call
// - key string
// - value string
// - key string
// - value string
func (_e *TxnKV_Expecter) Save(key interface{}, value interface{}) *TxnKV_Save_Call {
return &TxnKV_Save_Call{Call: _e.mock.On("Save", key, value)}
}
......
......@@ -130,6 +130,8 @@ type DataCoordCatalog interface {
AlterSegmentIndex(ctx context.Context, newSegIndex *model.SegmentIndex) error
AlterSegmentIndexes(ctx context.Context, newSegIdxes []*model.SegmentIndex) error
DropSegmentIndex(ctx context.Context, collID, partID, segID, buildID typeutil.UniqueID) error
GcConfirm(ctx context.Context, collectionID, partitionID typeutil.UniqueID) bool
}
type IndexCoordCatalog interface {
......
......@@ -685,6 +685,23 @@ func (kc *Catalog) DropSegmentIndex(ctx context.Context, collID, partID, segID,
return nil
}
const allPartitionID = -1
// GcConfirm returns true if related collection/partition is not found.
// DataCoord will remove all the meta eventually after GC is finished.
func (kc *Catalog) GcConfirm(ctx context.Context, collectionID, partitionID typeutil.UniqueID) bool {
prefix := buildCollectionPrefix(collectionID)
if partitionID != allPartitionID {
prefix = buildPartitionPrefix(collectionID, partitionID)
}
keys, values, err := kc.MetaKv.LoadWithPrefix(prefix)
if err != nil {
// error case can be regarded as not finished.
return false
}
return len(keys) == 0 && len(values) == 0
}
func fillLogPathByLogID(chunkManagerRootPath string, binlogType storage.BinlogType, collectionID, partitionID,
segmentID typeutil.UniqueID, fieldBinlog *datapb.FieldBinlog) error {
for _, binlog := range fieldBinlog.Binlogs {
......@@ -956,3 +973,11 @@ func BuildIndexKey(collectionID, indexID int64) string {
func BuildSegmentIndexKey(collectionID, partitionID, segmentID, buildID int64) string {
return fmt.Sprintf("%s/%d/%d/%d/%d", util.SegmentIndexPrefix, collectionID, partitionID, segmentID, buildID)
}
func buildCollectionPrefix(collectionID typeutil.UniqueID) string {
return fmt.Sprintf("%s/%d", SegmentPrefix, collectionID)
}
func buildPartitionPrefix(collectionID, partitionID typeutil.UniqueID) string {
return fmt.Sprintf("%s/%d/%d", SegmentPrefix, collectionID, partitionID)
}
......@@ -1346,3 +1346,20 @@ func TestMain(m *testing.M) {
code := m.Run()
os.Exit(code)
}
func TestCatalog_GcConfirm(t *testing.T) {
kc := &Catalog{}
txn := mocks.NewMetaKv(t)
kc.MetaKv = txn
txn.On("LoadWithPrefix",
mock.AnythingOfType("string")).
Return(nil, nil, errors.New("error mock LoadWithPrefix")).
Once()
assert.False(t, kc.GcConfirm(context.TODO(), 100, 10000))
txn.On("LoadWithPrefix",
mock.AnythingOfType("string")).
Return(nil, nil, nil)
assert.True(t, kc.GcConfirm(context.TODO(), 100, 10000))
}
......@@ -46,8 +46,8 @@ type DataCoordCatalog_AddSegment_Call struct {
}
// AddSegment is a helper method to define mock.On call
// - ctx context.Context
// - segment *datapb.SegmentInfo
// - ctx context.Context
// - segment *datapb.SegmentInfo
func (_e *DataCoordCatalog_Expecter) AddSegment(ctx interface{}, segment interface{}) *DataCoordCatalog_AddSegment_Call {
return &DataCoordCatalog_AddSegment_Call{Call: _e.mock.On("AddSegment", ctx, segment)}
}
......@@ -84,8 +84,8 @@ type DataCoordCatalog_AlterIndex_Call struct {
}
// AlterIndex is a helper method to define mock.On call
// - ctx context.Context
// - newIndex *model.Index
// - ctx context.Context
// - newIndex *model.Index
func (_e *DataCoordCatalog_Expecter) AlterIndex(ctx interface{}, newIndex interface{}) *DataCoordCatalog_AlterIndex_Call {
return &DataCoordCatalog_AlterIndex_Call{Call: _e.mock.On("AlterIndex", ctx, newIndex)}
}
......@@ -122,8 +122,8 @@ type DataCoordCatalog_AlterIndexes_Call struct {
}
// AlterIndexes is a helper method to define mock.On call
// - ctx context.Context
// - newIndexes []*model.Index
// - ctx context.Context
// - newIndexes []*model.Index
func (_e *DataCoordCatalog_Expecter) AlterIndexes(ctx interface{}, newIndexes interface{}) *DataCoordCatalog_AlterIndexes_Call {
return &DataCoordCatalog_AlterIndexes_Call{Call: _e.mock.On("AlterIndexes", ctx, newIndexes)}
}
......@@ -160,9 +160,9 @@ type DataCoordCatalog_AlterSegment_Call struct {
}
// AlterSegment is a helper method to define mock.On call
// - ctx context.Context
// - newSegment *datapb.SegmentInfo
// - oldSegment *datapb.SegmentInfo
// - ctx context.Context
// - newSegment *datapb.SegmentInfo
// - oldSegment *datapb.SegmentInfo
func (_e *DataCoordCatalog_Expecter) AlterSegment(ctx interface{}, newSegment interface{}, oldSegment interface{}) *DataCoordCatalog_AlterSegment_Call {
return &DataCoordCatalog_AlterSegment_Call{Call: _e.mock.On("AlterSegment", ctx, newSegment, oldSegment)}
}
......@@ -199,8 +199,8 @@ type DataCoordCatalog_AlterSegmentIndex_Call struct {
}
// AlterSegmentIndex is a helper method to define mock.On call
// - ctx context.Context
// - newSegIndex *model.SegmentIndex
// - ctx context.Context
// - newSegIndex *model.SegmentIndex
func (_e *DataCoordCatalog_Expecter) AlterSegmentIndex(ctx interface{}, newSegIndex interface{}) *DataCoordCatalog_AlterSegmentIndex_Call {
return &DataCoordCatalog_AlterSegmentIndex_Call{Call: _e.mock.On("AlterSegmentIndex", ctx, newSegIndex)}
}
......@@ -237,8 +237,8 @@ type DataCoordCatalog_AlterSegmentIndexes_Call struct {
}
// AlterSegmentIndexes is a helper method to define mock.On call
// - ctx context.Context
// - newSegIdxes []*model.SegmentIndex
// - ctx context.Context
// - newSegIdxes []*model.SegmentIndex
func (_e *DataCoordCatalog_Expecter) AlterSegmentIndexes(ctx interface{}, newSegIdxes interface{}) *DataCoordCatalog_AlterSegmentIndexes_Call {
return &DataCoordCatalog_AlterSegmentIndexes_Call{Call: _e.mock.On("AlterSegmentIndexes", ctx, newSegIdxes)}
}
......@@ -275,8 +275,8 @@ type DataCoordCatalog_AlterSegments_Call struct {
}
// AlterSegments is a helper method to define mock.On call
// - ctx context.Context
// - newSegments []*datapb.SegmentInfo
// - ctx context.Context
// - newSegments []*datapb.SegmentInfo
func (_e *DataCoordCatalog_Expecter) AlterSegments(ctx interface{}, newSegments interface{}) *DataCoordCatalog_AlterSegments_Call {
return &DataCoordCatalog_AlterSegments_Call{Call: _e.mock.On("AlterSegments", ctx, newSegments)}
}
......@@ -313,9 +313,9 @@ type DataCoordCatalog_AlterSegmentsAndAddNewSegment_Call struct {
}
// AlterSegmentsAndAddNewSegment is a helper method to define mock.On call
// - ctx context.Context
// - segments []*datapb.SegmentInfo
// - newSegment *datapb.SegmentInfo
// - ctx context.Context
// - segments []*datapb.SegmentInfo
// - newSegment *datapb.SegmentInfo
func (_e *DataCoordCatalog_Expecter) AlterSegmentsAndAddNewSegment(ctx interface{}, segments interface{}, newSegment interface{}) *DataCoordCatalog_AlterSegmentsAndAddNewSegment_Call {
return &DataCoordCatalog_AlterSegmentsAndAddNewSegment_Call{Call: _e.mock.On("AlterSegmentsAndAddNewSegment", ctx, segments, newSegment)}
}
......@@ -352,8 +352,8 @@ type DataCoordCatalog_CreateIndex_Call struct {
}
// CreateIndex is a helper method to define mock.On call
// - ctx context.Context
// - index *model.Index
// - ctx context.Context
// - index *model.Index
func (_e *DataCoordCatalog_Expecter) CreateIndex(ctx interface{}, index interface{}) *DataCoordCatalog_CreateIndex_Call {
return &DataCoordCatalog_CreateIndex_Call{Call: _e.mock.On("CreateIndex", ctx, index)}
}
......@@ -390,8 +390,8 @@ type DataCoordCatalog_CreateSegmentIndex_Call struct {
}
// CreateSegmentIndex is a helper method to define mock.On call
// - ctx context.Context
// - segIdx *model.SegmentIndex
// - ctx context.Context
// - segIdx *model.SegmentIndex
func (_e *DataCoordCatalog_Expecter) CreateSegmentIndex(ctx interface{}, segIdx interface{}) *DataCoordCatalog_CreateSegmentIndex_Call {
return &DataCoordCatalog_CreateSegmentIndex_Call{Call: _e.mock.On("CreateSegmentIndex", ctx, segIdx)}
}
......@@ -428,8 +428,8 @@ type DataCoordCatalog_DropChannel_Call struct {
}
// DropChannel is a helper method to define mock.On call
// - ctx context.Context
// - channel string
// - ctx context.Context
// - channel string
func (_e *DataCoordCatalog_Expecter) DropChannel(ctx interface{}, channel interface{}) *DataCoordCatalog_DropChannel_Call {
return &DataCoordCatalog_DropChannel_Call{Call: _e.mock.On("DropChannel", ctx, channel)}
}
......@@ -466,8 +466,8 @@ type DataCoordCatalog_DropChannelCheckpoint_Call struct {
}
// DropChannelCheckpoint is a helper method to define mock.On call
// - ctx context.Context
// - vChannel string
// - ctx context.Context
// - vChannel string
func (_e *DataCoordCatalog_Expecter) DropChannelCheckpoint(ctx interface{}, vChannel interface{}) *DataCoordCatalog_DropChannelCheckpoint_Call {
return &DataCoordCatalog_DropChannelCheckpoint_Call{Call: _e.mock.On("DropChannelCheckpoint", ctx, vChannel)}
}
......@@ -504,9 +504,9 @@ type DataCoordCatalog_DropIndex_Call struct {
}
// DropIndex is a helper method to define mock.On call
// - ctx context.Context
// - collID int64
// - dropIdxID int64
// - ctx context.Context
// - collID int64
// - dropIdxID int64
func (_e *DataCoordCatalog_Expecter) DropIndex(ctx interface{}, collID interface{}, dropIdxID interface{}) *DataCoordCatalog_DropIndex_Call {
return &DataCoordCatalog_DropIndex_Call{Call: _e.mock.On("DropIndex", ctx, collID, dropIdxID)}
}
......@@ -543,8 +543,8 @@ type DataCoordCatalog_DropSegment_Call struct {
}
// DropSegment is a helper method to define mock.On call
// - ctx context.Context
// - segment *datapb.SegmentInfo
// - ctx context.Context
// - segment *datapb.SegmentInfo
func (_e *DataCoordCatalog_Expecter) DropSegment(ctx interface{}, segment interface{}) *DataCoordCatalog_DropSegment_Call {
return &DataCoordCatalog_DropSegment_Call{Call: _e.mock.On("DropSegment", ctx, segment)}
}
......@@ -581,11 +581,11 @@ type DataCoordCatalog_DropSegmentIndex_Call struct {
}
// DropSegmentIndex is a helper method to define mock.On call
// - ctx context.Context
// - collID int64
// - partID int64
// - segID int64
// - buildID int64
// - ctx context.Context
// - collID int64
// - partID int64
// - segID int64
// - buildID int64
func (_e *DataCoordCatalog_Expecter) DropSegmentIndex(ctx interface{}, collID interface{}, partID interface{}, segID interface{}, buildID interface{}) *DataCoordCatalog_DropSegmentIndex_Call {
return &DataCoordCatalog_DropSegmentIndex_Call{Call: _e.mock.On("DropSegmentIndex", ctx, collID, partID, segID, buildID)}
}
......@@ -602,6 +602,45 @@ func (_c *DataCoordCatalog_DropSegmentIndex_Call) Return(_a0 error) *DataCoordCa
return _c
}
// GcConfirm provides a mock function with given fields: ctx, collectionID, partitionID
func (_m *DataCoordCatalog) GcConfirm(ctx context.Context, collectionID int64, partitionID int64) bool {
ret := _m.Called(ctx, collectionID, partitionID)
var r0 bool
if rf, ok := ret.Get(0).(func(context.Context, int64, int64) bool); ok {
r0 = rf(ctx, collectionID, partitionID)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// DataCoordCatalog_GcConfirm_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GcConfirm'
type DataCoordCatalog_GcConfirm_Call struct {
*mock.Call
}
// GcConfirm is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
// - partitionID int64
func (_e *DataCoordCatalog_Expecter) GcConfirm(ctx interface{}, collectionID interface{}, partitionID interface{}) *DataCoordCatalog_GcConfirm_Call {
return &DataCoordCatalog_GcConfirm_Call{Call: _e.mock.On("GcConfirm", ctx, collectionID, partitionID)}
}
func (_c *DataCoordCatalog_GcConfirm_Call) Run(run func(ctx context.Context, collectionID int64, partitionID int64)) *DataCoordCatalog_GcConfirm_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int64), args[2].(int64))
})
return _c
}
func (_c *DataCoordCatalog_GcConfirm_Call) Return(_a0 bool) *DataCoordCatalog_GcConfirm_Call {
_c.Call.Return(_a0)
return _c
}
// IsChannelDropped provides a mock function with given fields: ctx, channel
func (_m *DataCoordCatalog) IsChannelDropped(ctx context.Context, channel string) bool {
ret := _m.Called(ctx, channel)
......@@ -622,8 +661,8 @@ type DataCoordCatalog_IsChannelDropped_Call struct {
}
// IsChannelDropped is a helper method to define mock.On call
// - ctx context.Context
// - channel string
// - ctx context.Context
// - channel string
func (_e *DataCoordCatalog_Expecter) IsChannelDropped(ctx interface{}, channel interface{}) *DataCoordCatalog_IsChannelDropped_Call {
return &DataCoordCatalog_IsChannelDropped_Call{Call: _e.mock.On("IsChannelDropped", ctx, channel)}
}
......@@ -669,7 +708,7 @@ type DataCoordCatalog_ListChannelCheckpoint_Call struct {
}
// ListChannelCheckpoint is a helper method to define mock.On call
// - ctx context.Context
// - ctx context.Context
func (_e *DataCoordCatalog_Expecter) ListChannelCheckpoint(ctx interface{}) *DataCoordCatalog_ListChannelCheckpoint_Call {
return &DataCoordCatalog_ListChannelCheckpoint_Call{Call: _e.mock.On("ListChannelCheckpoint", ctx)}
}
......@@ -715,7 +754,7 @@ type DataCoordCatalog_ListIndexes_Call struct {
}
// ListIndexes is a helper method to define mock.On call
// - ctx context.Context
// - ctx context.Context
func (_e *DataCoordCatalog_Expecter) ListIndexes(ctx interface{}) *DataCoordCatalog_ListIndexes_Call {
return &DataCoordCatalog_ListIndexes_Call{Call: _e.mock.On("ListIndexes", ctx)}
}
......@@ -761,7 +800,7 @@ type DataCoordCatalog_ListSegmentIndexes_Call struct {
}
// ListSegmentIndexes is a helper method to define mock.On call
// - ctx context.Context
// - ctx context.Context
func (_e *DataCoordCatalog_Expecter) ListSegmentIndexes(ctx interface{}) *DataCoordCatalog_ListSegmentIndexes_Call {
return &DataCoordCatalog_ListSegmentIndexes_Call{Call: _e.mock.On("ListSegmentIndexes", ctx)}
}
......@@ -807,7 +846,7 @@ type DataCoordCatalog_ListSegments_Call struct {
}
// ListSegments is a helper method to define mock.On call
// - ctx context.Context
// - ctx context.Context
func (_e *DataCoordCatalog_Expecter) ListSegments(ctx interface{}) *DataCoordCatalog_ListSegments_Call {
return &DataCoordCatalog_ListSegments_Call{Call: _e.mock.On("ListSegments", ctx)}
}
......@@ -844,8 +883,8 @@ type DataCoordCatalog_MarkChannelDeleted_Call struct {
}
// MarkChannelDeleted is a helper method to define mock.On call
// - ctx context.Context
// - channel string
// - ctx context.Context
// - channel string
func (_e *DataCoordCatalog_Expecter) MarkChannelDeleted(ctx interface{}, channel interface{}) *DataCoordCatalog_MarkChannelDeleted_Call {
return &DataCoordCatalog_MarkChannelDeleted_Call{Call: _e.mock.On("MarkChannelDeleted", ctx, channel)}
}
......@@ -882,9 +921,9 @@ type DataCoordCatalog_RevertAlterSegmentsAndAddNewSegment_Call struct {
}
// RevertAlterSegmentsAndAddNewSegment is a helper method to define mock.On call
// - ctx context.Context
// - segments []*datapb.SegmentInfo
// - removalSegment *datapb.SegmentInfo
// - ctx context.Context
// - segments []*datapb.SegmentInfo
// - removalSegment *datapb.SegmentInfo
func (_e *DataCoordCatalog_Expecter) RevertAlterSegmentsAndAddNewSegment(ctx interface{}, segments interface{}, removalSegment interface{}) *DataCoordCatalog_RevertAlterSegmentsAndAddNewSegment_Call {
return &DataCoordCatalog_RevertAlterSegmentsAndAddNewSegment_Call{Call: _e.mock.On("RevertAlterSegmentsAndAddNewSegment", ctx, segments, removalSegment)}
}
......@@ -921,9 +960,9 @@ type DataCoordCatalog_SaveChannelCheckpoint_Call struct {
}
// SaveChannelCheckpoint is a helper method to define mock.On call
// - ctx context.Context
// - vChannel string
// - pos *internalpb.MsgPosition
// - ctx context.Context
// - vChannel string
// - pos *internalpb.MsgPosition
func (_e *DataCoordCatalog_Expecter) SaveChannelCheckpoint(ctx interface{}, vChannel interface{}, pos interface{}) *DataCoordCatalog_SaveChannelCheckpoint_Call {
return &DataCoordCatalog_SaveChannelCheckpoint_Call{Call: _e.mock.On("SaveChannelCheckpoint", ctx, vChannel, pos)}
}
......@@ -960,8 +999,8 @@ type DataCoordCatalog_SaveDroppedSegmentsInBatch_Call struct {
}
// SaveDroppedSegmentsInBatch is a helper method to define mock.On call
// - ctx context.Context
// - segments []*datapb.SegmentInfo
// - ctx context.Context
// - segments []*datapb.SegmentInfo
func (_e *DataCoordCatalog_Expecter) SaveDroppedSegmentsInBatch(ctx interface{}, segments interface{}) *DataCoordCatalog_SaveDroppedSegmentsInBatch_Call {
return &DataCoordCatalog_SaveDroppedSegmentsInBatch_Call{Call: _e.mock.On("SaveDroppedSegmentsInBatch", ctx, segments)}
}
......
// Code generated by mockery v2.14.0. DO NOT EDIT.
// Code generated by mockery v2.16.0. DO NOT EDIT.
package mocks
......
// Code generated by mockery v2.14.0. DO NOT EDIT.
// Code generated by mockery v2.16.0. DO NOT EDIT.
package mocks
......@@ -9,6 +9,8 @@ import (
datapb "github.com/milvus-io/milvus/internal/proto/datapb"
indexpb "github.com/milvus-io/milvus/internal/proto/indexpb"
internalpb "github.com/milvus-io/milvus/internal/proto/internalpb"
milvuspb "github.com/milvus-io/milvus-proto/go-api/milvuspb"
......@@ -77,11 +79,11 @@ func (_c *DataCoord_AssignSegmentID_Call) Return(_a0 *datapb.AssignSegmentIDResp
}
// BroadcastAlteredCollection provides a mock function with given fields: ctx, req
func (_m *DataCoord) BroadcastAlteredCollection(ctx context.Context, req *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
func (_m *DataCoord) BroadcastAlteredCollection(ctx context.Context, req *datapb.AlterCollectionRequest) (*commonpb.Status, error) {
ret := _m.Called(ctx, req)
var r0 *commonpb.Status
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterCollectionRequest) *commonpb.Status); ok {
if rf, ok := ret.Get(0).(func(context.Context, *datapb.AlterCollectionRequest) *commonpb.Status); ok {
r0 = rf(ctx, req)
} else {
if ret.Get(0) != nil {
......@@ -90,7 +92,7 @@ func (_m *DataCoord) BroadcastAlteredCollection(ctx context.Context, req *milvus
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.AlterCollectionRequest) error); ok {
if rf, ok := ret.Get(1).(func(context.Context, *datapb.AlterCollectionRequest) error); ok {
r1 = rf(ctx, req)
} else {
r1 = ret.Error(1)
......@@ -106,14 +108,14 @@ type DataCoord_BroadcastAlteredCollection_Call struct {
// BroadcastAlteredCollection is a helper method to define mock.On call
// - ctx context.Context
// - req *milvuspb.AlterCollectionRequest
// - req *datapb.AlterCollectionRequest
func (_e *DataCoord_Expecter) BroadcastAlteredCollection(ctx interface{}, req interface{}) *DataCoord_BroadcastAlteredCollection_Call {
return &DataCoord_BroadcastAlteredCollection_Call{Call: _e.mock.On("BroadcastAlteredCollection", ctx, req)}
}
func (_c *DataCoord_BroadcastAlteredCollection_Call) Run(run func(ctx context.Context, req *milvuspb.AlterCollectionRequest)) *DataCoord_BroadcastAlteredCollection_Call {
func (_c *DataCoord_BroadcastAlteredCollection_Call) Run(run func(ctx context.Context, req *datapb.AlterCollectionRequest)) *DataCoord_BroadcastAlteredCollection_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*milvuspb.AlterCollectionRequest))
run(args[0].(context.Context), args[1].(*datapb.AlterCollectionRequest))
})
return _c
}
......@@ -170,6 +172,147 @@ func (_c *DataCoord_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse,
return _c
}
// CreateIndex provides a mock function with given fields: ctx, req
func (_m *DataCoord) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) {
ret := _m.Called(ctx, req)
var r0 *commonpb.Status
if rf, ok := ret.Get(0).(func(context.Context, *indexpb.CreateIndexRequest) *commonpb.Status); ok {
r0 = rf(ctx, req)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*commonpb.Status)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *indexpb.CreateIndexRequest) error); ok {
r1 = rf(ctx, req)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// DataCoord_CreateIndex_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateIndex'
type DataCoord_CreateIndex_Call struct {
*mock.Call
}
// CreateIndex is a helper method to define mock.On call
// - ctx context.Context
// - req *indexpb.CreateIndexRequest
func (_e *DataCoord_Expecter) CreateIndex(ctx interface{}, req interface{}) *DataCoord_CreateIndex_Call {
return &DataCoord_CreateIndex_Call{Call: _e.mock.On("CreateIndex", ctx, req)}
}
func (_c *DataCoord_CreateIndex_Call) Run(run func(ctx context.Context, req *indexpb.CreateIndexRequest)) *DataCoord_CreateIndex_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*indexpb.CreateIndexRequest))
})
return _c
}
func (_c *DataCoord_CreateIndex_Call) Return(_a0 *commonpb.Status, _a1 error) *DataCoord_CreateIndex_Call {
_c.Call.Return(_a0, _a1)
return _c
}
// DescribeIndex provides a mock function with given fields: ctx, req
func (_m *DataCoord) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRequest) (*indexpb.DescribeIndexResponse, error) {
ret := _m.Called(ctx, req)
var r0 *indexpb.DescribeIndexResponse
if rf, ok := ret.Get(0).(func(context.Context, *indexpb.DescribeIndexRequest) *indexpb.DescribeIndexResponse); ok {
r0 = rf(ctx, req)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*indexpb.DescribeIndexResponse)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *indexpb.DescribeIndexRequest) error); ok {
r1 = rf(ctx, req)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// DataCoord_DescribeIndex_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DescribeIndex'
type DataCoord_DescribeIndex_Call struct {
*mock.Call
}
// DescribeIndex is a helper method to define mock.On call
// - ctx context.Context
// - req *indexpb.DescribeIndexRequest
func (_e *DataCoord_Expecter) DescribeIndex(ctx interface{}, req interface{}) *DataCoord_DescribeIndex_Call {
return &DataCoord_DescribeIndex_Call{Call: _e.mock.On("DescribeIndex", ctx, req)}
}
func (_c *DataCoord_DescribeIndex_Call) Run(run func(ctx context.Context, req *indexpb.DescribeIndexRequest)) *DataCoord_DescribeIndex_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*indexpb.DescribeIndexRequest))
})
return _c
}
func (_c *DataCoord_DescribeIndex_Call) Return(_a0 *indexpb.DescribeIndexResponse, _a1 error) *DataCoord_DescribeIndex_Call {
_c.Call.Return(_a0, _a1)
return _c
}
// DropIndex provides a mock function with given fields: ctx, req
func (_m *DataCoord) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) {
ret := _m.Called(ctx, req)
var r0 *commonpb.Status
if rf, ok := ret.Get(0).(func(context.Context, *indexpb.DropIndexRequest) *commonpb.Status); ok {
r0 = rf(ctx, req)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*commonpb.Status)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *indexpb.DropIndexRequest) error); ok {
r1 = rf(ctx, req)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// DataCoord_DropIndex_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropIndex'
type DataCoord_DropIndex_Call struct {
*mock.Call
}
// DropIndex is a helper method to define mock.On call
// - ctx context.Context
// - req *indexpb.DropIndexRequest
func (_e *DataCoord_Expecter) DropIndex(ctx interface{}, req interface{}) *DataCoord_DropIndex_Call {
return &DataCoord_DropIndex_Call{Call: _e.mock.On("DropIndex", ctx, req)}
}
func (_c *DataCoord_DropIndex_Call) Run(run func(ctx context.Context, req *indexpb.DropIndexRequest)) *DataCoord_DropIndex_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*indexpb.DropIndexRequest))
})
return _c
}
func (_c *DataCoord_DropIndex_Call) Return(_a0 *commonpb.Status, _a1 error) *DataCoord_DropIndex_Call {
_c.Call.Return(_a0, _a1)
return _c
}
// DropVirtualChannel provides a mock function with given fields: ctx, req
func (_m *DataCoord) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
ret := _m.Called(ctx, req)
......@@ -264,6 +407,53 @@ func (_c *DataCoord_Flush_Call) Return(_a0 *datapb.FlushResponse, _a1 error) *Da
return _c
}
// GcConfirm provides a mock function with given fields: ctx, request
func (_m *DataCoord) GcConfirm(ctx context.Context, request *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) {
ret := _m.Called(ctx, request)
var r0 *datapb.GcConfirmResponse
if rf, ok := ret.Get(0).(func(context.Context, *datapb.GcConfirmRequest) *datapb.GcConfirmResponse); ok {
r0 = rf(ctx, request)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*datapb.GcConfirmResponse)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *datapb.GcConfirmRequest) error); ok {
r1 = rf(ctx, request)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// DataCoord_GcConfirm_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GcConfirm'
type DataCoord_GcConfirm_Call struct {
*mock.Call
}
// GcConfirm is a helper method to define mock.On call
// - ctx context.Context
// - request *datapb.GcConfirmRequest
func (_e *DataCoord_Expecter) GcConfirm(ctx interface{}, request interface{}) *DataCoord_GcConfirm_Call {
return &DataCoord_GcConfirm_Call{Call: _e.mock.On("GcConfirm", ctx, request)}
}
func (_c *DataCoord_GcConfirm_Call) Run(run func(ctx context.Context, request *datapb.GcConfirmRequest)) *DataCoord_GcConfirm_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*datapb.GcConfirmRequest))
})
return _c
}
func (_c *DataCoord_GcConfirm_Call) Return(_a0 *datapb.GcConfirmResponse, _a1 error) *DataCoord_GcConfirm_Call {
_c.Call.Return(_a0, _a1)
return _c
}
// GetCollectionStatistics provides a mock function with given fields: ctx, req
func (_m *DataCoord) GetCollectionStatistics(ctx context.Context, req *datapb.GetCollectionStatisticsRequest) (*datapb.GetCollectionStatisticsResponse, error) {
ret := _m.Called(ctx, req)
......@@ -545,6 +735,147 @@ func (_c *DataCoord_GetFlushedSegments_Call) Return(_a0 *datapb.GetFlushedSegmen
return _c
}
// GetIndexBuildProgress provides a mock function with given fields: ctx, req
func (_m *DataCoord) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetIndexBuildProgressRequest) (*indexpb.GetIndexBuildProgressResponse, error) {
ret := _m.Called(ctx, req)
var r0 *indexpb.GetIndexBuildProgressResponse
if rf, ok := ret.Get(0).(func(context.Context, *indexpb.GetIndexBuildProgressRequest) *indexpb.GetIndexBuildProgressResponse); ok {
r0 = rf(ctx, req)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*indexpb.GetIndexBuildProgressResponse)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *indexpb.GetIndexBuildProgressRequest) error); ok {
r1 = rf(ctx, req)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// DataCoord_GetIndexBuildProgress_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetIndexBuildProgress'
type DataCoord_GetIndexBuildProgress_Call struct {
*mock.Call
}
// GetIndexBuildProgress is a helper method to define mock.On call
// - ctx context.Context
// - req *indexpb.GetIndexBuildProgressRequest
func (_e *DataCoord_Expecter) GetIndexBuildProgress(ctx interface{}, req interface{}) *DataCoord_GetIndexBuildProgress_Call {
return &DataCoord_GetIndexBuildProgress_Call{Call: _e.mock.On("GetIndexBuildProgress", ctx, req)}
}
func (_c *DataCoord_GetIndexBuildProgress_Call) Run(run func(ctx context.Context, req *indexpb.GetIndexBuildProgressRequest)) *DataCoord_GetIndexBuildProgress_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*indexpb.GetIndexBuildProgressRequest))
})
return _c
}
func (_c *DataCoord_GetIndexBuildProgress_Call) Return(_a0 *indexpb.GetIndexBuildProgressResponse, _a1 error) *DataCoord_GetIndexBuildProgress_Call {
_c.Call.Return(_a0, _a1)
return _c
}
// GetIndexInfos provides a mock function with given fields: ctx, req
func (_m *DataCoord) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInfoRequest) (*indexpb.GetIndexInfoResponse, error) {
ret := _m.Called(ctx, req)
var r0 *indexpb.GetIndexInfoResponse
if rf, ok := ret.Get(0).(func(context.Context, *indexpb.GetIndexInfoRequest) *indexpb.GetIndexInfoResponse); ok {
r0 = rf(ctx, req)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*indexpb.GetIndexInfoResponse)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *indexpb.GetIndexInfoRequest) error); ok {
r1 = rf(ctx, req)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// DataCoord_GetIndexInfos_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetIndexInfos'
type DataCoord_GetIndexInfos_Call struct {
*mock.Call
}
// GetIndexInfos is a helper method to define mock.On call
// - ctx context.Context
// - req *indexpb.GetIndexInfoRequest
func (_e *DataCoord_Expecter) GetIndexInfos(ctx interface{}, req interface{}) *DataCoord_GetIndexInfos_Call {
return &DataCoord_GetIndexInfos_Call{Call: _e.mock.On("GetIndexInfos", ctx, req)}
}
func (_c *DataCoord_GetIndexInfos_Call) Run(run func(ctx context.Context, req *indexpb.GetIndexInfoRequest)) *DataCoord_GetIndexInfos_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*indexpb.GetIndexInfoRequest))
})
return _c
}
func (_c *DataCoord_GetIndexInfos_Call) Return(_a0 *indexpb.GetIndexInfoResponse, _a1 error) *DataCoord_GetIndexInfos_Call {
_c.Call.Return(_a0, _a1)
return _c
}
// GetIndexState provides a mock function with given fields: ctx, req
func (_m *DataCoord) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRequest) (*indexpb.GetIndexStateResponse, error) {
ret := _m.Called(ctx, req)
var r0 *indexpb.GetIndexStateResponse
if rf, ok := ret.Get(0).(func(context.Context, *indexpb.GetIndexStateRequest) *indexpb.GetIndexStateResponse); ok {
r0 = rf(ctx, req)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*indexpb.GetIndexStateResponse)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *indexpb.GetIndexStateRequest) error); ok {
r1 = rf(ctx, req)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// DataCoord_GetIndexState_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetIndexState'
type DataCoord_GetIndexState_Call struct {
*mock.Call
}
// GetIndexState is a helper method to define mock.On call
// - ctx context.Context
// - req *indexpb.GetIndexStateRequest
func (_e *DataCoord_Expecter) GetIndexState(ctx interface{}, req interface{}) *DataCoord_GetIndexState_Call {
return &DataCoord_GetIndexState_Call{Call: _e.mock.On("GetIndexState", ctx, req)}
}
func (_c *DataCoord_GetIndexState_Call) Run(run func(ctx context.Context, req *indexpb.GetIndexStateRequest)) *DataCoord_GetIndexState_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*indexpb.GetIndexStateRequest))
})
return _c
}
func (_c *DataCoord_GetIndexState_Call) Return(_a0 *indexpb.GetIndexStateResponse, _a1 error) *DataCoord_GetIndexState_Call {
_c.Call.Return(_a0, _a1)
return _c
}
// GetInsertBinlogPaths provides a mock function with given fields: ctx, req
func (_m *DataCoord) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsertBinlogPathsRequest) (*datapb.GetInsertBinlogPathsResponse, error) {
ret := _m.Called(ctx, req)
......@@ -733,6 +1064,53 @@ func (_c *DataCoord_GetRecoveryInfo_Call) Return(_a0 *datapb.GetRecoveryInfoResp
return _c
}
// GetSegmentIndexState provides a mock function with given fields: ctx, req
func (_m *DataCoord) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegmentIndexStateRequest) (*indexpb.GetSegmentIndexStateResponse, error) {
ret := _m.Called(ctx, req)
var r0 *indexpb.GetSegmentIndexStateResponse
if rf, ok := ret.Get(0).(func(context.Context, *indexpb.GetSegmentIndexStateRequest) *indexpb.GetSegmentIndexStateResponse); ok {
r0 = rf(ctx, req)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*indexpb.GetSegmentIndexStateResponse)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *indexpb.GetSegmentIndexStateRequest) error); ok {
r1 = rf(ctx, req)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// DataCoord_GetSegmentIndexState_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSegmentIndexState'
type DataCoord_GetSegmentIndexState_Call struct {
*mock.Call
}
// GetSegmentIndexState is a helper method to define mock.On call
// - ctx context.Context
// - req *indexpb.GetSegmentIndexStateRequest
func (_e *DataCoord_Expecter) GetSegmentIndexState(ctx interface{}, req interface{}) *DataCoord_GetSegmentIndexState_Call {
return &DataCoord_GetSegmentIndexState_Call{Call: _e.mock.On("GetSegmentIndexState", ctx, req)}
}
func (_c *DataCoord_GetSegmentIndexState_Call) Run(run func(ctx context.Context, req *indexpb.GetSegmentIndexStateRequest)) *DataCoord_GetSegmentIndexState_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*indexpb.GetSegmentIndexStateRequest))
})
return _c
}
func (_c *DataCoord_GetSegmentIndexState_Call) Return(_a0 *indexpb.GetSegmentIndexStateResponse, _a1 error) *DataCoord_GetSegmentIndexState_Call {
_c.Call.Return(_a0, _a1)
return _c
}
// GetSegmentInfo provides a mock function with given fields: ctx, req
func (_m *DataCoord) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
ret := _m.Called(ctx, req)
......@@ -1532,6 +1910,53 @@ func (_c *DataCoord_UnsetIsImportingState_Call) Return(_a0 *commonpb.Status, _a1
return _c
}
// UpdateChannelCheckpoint provides a mock function with given fields: ctx, req
func (_m *DataCoord) UpdateChannelCheckpoint(ctx context.Context, req *datapb.UpdateChannelCheckpointRequest) (*commonpb.Status, error) {
ret := _m.Called(ctx, req)
var r0 *commonpb.Status
if rf, ok := ret.Get(0).(func(context.Context, *datapb.UpdateChannelCheckpointRequest) *commonpb.Status); ok {
r0 = rf(ctx, req)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*commonpb.Status)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *datapb.UpdateChannelCheckpointRequest) error); ok {
r1 = rf(ctx, req)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// DataCoord_UpdateChannelCheckpoint_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateChannelCheckpoint'
type DataCoord_UpdateChannelCheckpoint_Call struct {
*mock.Call
}
// UpdateChannelCheckpoint is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.UpdateChannelCheckpointRequest
func (_e *DataCoord_Expecter) UpdateChannelCheckpoint(ctx interface{}, req interface{}) *DataCoord_UpdateChannelCheckpoint_Call {
return &DataCoord_UpdateChannelCheckpoint_Call{Call: _e.mock.On("UpdateChannelCheckpoint", ctx, req)}
}
func (_c *DataCoord_UpdateChannelCheckpoint_Call) Run(run func(ctx context.Context, req *datapb.UpdateChannelCheckpointRequest)) *DataCoord_UpdateChannelCheckpoint_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*datapb.UpdateChannelCheckpointRequest))
})
return _c
}
func (_c *DataCoord_UpdateChannelCheckpoint_Call) Return(_a0 *commonpb.Status, _a1 error) *DataCoord_UpdateChannelCheckpoint_Call {
_c.Call.Return(_a0, _a1)
return _c
}
// UpdateSegmentStatistics provides a mock function with given fields: ctx, req
func (_m *DataCoord) UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) (*commonpb.Status, error) {
ret := _m.Called(ctx, req)
......
此差异已折叠。
......@@ -77,6 +77,8 @@ service DataCoord {
rpc DescribeIndex(index.DescribeIndexRequest) returns (index.DescribeIndexResponse) {}
// Deprecated: use DescribeIndex instead
rpc GetIndexBuildProgress(index.GetIndexBuildProgressRequest) returns (index.GetIndexBuildProgressResponse) {}
rpc GcConfirm(GcConfirmRequest) returns (GcConfirmResponse) {}
}
service DataNode {
......@@ -643,6 +645,16 @@ message AlterCollectionRequest {
repeated common.KeyValuePair properties = 5;
}
message GcConfirmRequest {
int64 collection_id = 1;
int64 partition_id = 2; // -1 means whole collection.
}
message GcConfirmResponse {
common.Status status = 1;
bool gc_finished = 2;
}
//message IndexInfo {
// int64 collectionID = 1;
// int64 fieldID = 2;
......
......@@ -18,20 +18,22 @@ package proxy
import (
"context"
"sync/atomic"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/internal/util/uniquegenerator"
"go.uber.org/atomic"
)
type DataCoordMock struct {
types.DataCoord
nodeID typeutil.UniqueID
address string
......
......@@ -40,6 +40,7 @@ type Broker interface {
UnsetIsImportingState(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error)
MarkSegmentsDropped(context.Context, *datapb.MarkSegmentsDroppedRequest) (*commonpb.Status, error)
GetSegmentStates(context.Context, *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error)
GcConfirm(ctx context.Context, collectionID, partitionID UniqueID) bool
DropCollectionIndex(ctx context.Context, collID UniqueID, partIDs []UniqueID) error
GetSegmentIndexState(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error)
......@@ -235,3 +236,15 @@ func (b *ServerBroker) DescribeIndex(ctx context.Context, colID UniqueID) (*inde
CollectionID: colID,
})
}
func (b *ServerBroker) GcConfirm(ctx context.Context, collectionID, partitionID UniqueID) bool {
req := &datapb.GcConfirmRequest{CollectionId: collectionID, PartitionId: partitionID}
resp, err := b.s.dataCoord.GcConfirm(ctx, req)
if err != nil {
return false
}
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return false
}
return resp.GetGcFinished()
}
......@@ -5,15 +5,15 @@ import (
"errors"
"testing"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
func TestServerBroker_ReleaseCollection(t *testing.T) {
......@@ -304,3 +304,42 @@ func TestServerBroker_BroadcastAlteredCollection(t *testing.T) {
assert.NoError(t, err)
})
}
func TestServerBroker_GcConfirm(t *testing.T) {
t.Run("invalid datacoord", func(t *testing.T) {
dc := mocks.NewDataCoord(t)
dc.On("GcConfirm",
mock.Anything, // context.Context
mock.Anything, // *datapb.GcConfirmRequest
).Return(nil, errors.New("error mock GcConfirm"))
c := newTestCore(withDataCoord(dc))
broker := newServerBroker(c)
assert.False(t, broker.GcConfirm(context.Background(), 100, 10000))
})
t.Run("non success", func(t *testing.T) {
dc := mocks.NewDataCoord(t)
dc.On("GcConfirm",
mock.Anything, // context.Context
mock.Anything, // *datapb.GcConfirmRequest
).Return(
&datapb.GcConfirmResponse{Status: failStatus(commonpb.ErrorCode_UnexpectedError, "error mock GcConfirm")},
nil)
c := newTestCore(withDataCoord(dc))
broker := newServerBroker(c)
assert.False(t, broker.GcConfirm(context.Background(), 100, 10000))
})
t.Run("normal case", func(t *testing.T) {
dc := mocks.NewDataCoord(t)
dc.On("GcConfirm",
mock.Anything, // context.Context
mock.Anything, // *datapb.GcConfirmRequest
).Return(
&datapb.GcConfirmResponse{Status: succStatus(), GcFinished: true},
nil)
c := newTestCore(withDataCoord(dc))
broker := newServerBroker(c)
assert.True(t, broker.GcConfirm(context.Background(), 100, 10000))
})
}
......@@ -91,6 +91,7 @@ func (t *dropCollectionTask) Execute(ctx context.Context) error {
baseStep: baseStep{core: t.core},
pChannels: collMeta.PhysicalChannelNames,
})
redoTask.AddAsyncStep(newConfirmGCStep(t.core, collMeta.CollectionID, allPartition))
redoTask.AddAsyncStep(&deleteCollectionMetaStep{
baseStep: baseStep{core: t.core},
collectionID: collMeta.CollectionID,
......
......@@ -4,6 +4,7 @@ import (
"context"
"errors"
"testing"
"time"
"github.com/milvus-io/milvus/internal/common"
......@@ -148,6 +149,9 @@ func Test_dropCollectionTask_Execute(t *testing.T) {
t.Run("normal case, redo", func(t *testing.T) {
defer cleanTestEnv()
confirmGCInterval = time.Millisecond
defer restoreConfirmGCInterval()
collectionName := funcutil.GenRandomStr()
shardNum := 2
......@@ -187,8 +191,12 @@ func Test_dropCollectionTask_Execute(t *testing.T) {
broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID, partIDs []UniqueID) error {
dropIndexCalled = true
dropIndexChan <- struct{}{}
time.Sleep(confirmGCInterval)
return nil
}
broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool {
return true
}
gc := newMockGarbageCollector()
deleteCollectionCalled := false
......
......@@ -79,6 +79,7 @@ func (t *dropPartitionTask) Execute(ctx context.Context) error {
CollectionID: t.collMeta.CollectionID,
},
})
redoTask.AddAsyncStep(newConfirmGCStep(t.core, t.collMeta.CollectionID, partID))
redoTask.AddAsyncStep(&removePartitionMetaStep{
baseStep: baseStep{core: t.core},
collectionID: t.collMeta.CollectionID,
......
......@@ -3,6 +3,7 @@ package rootcoord
import (
"context"
"testing"
"time"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
......@@ -125,6 +126,9 @@ func Test_dropPartitionTask_Execute(t *testing.T) {
})
t.Run("normal case", func(t *testing.T) {
confirmGCInterval = time.Millisecond
defer restoreConfirmGCInterval()
collectionName := funcutil.GenRandomStr()
partitionName := funcutil.GenRandomStr()
coll := &model.Collection{Name: collectionName, Partitions: []*model.Partition{{PartitionName: partitionName}}}
......@@ -146,10 +150,23 @@ func Test_dropPartitionTask_Execute(t *testing.T) {
gc.GcPartitionDataFunc = func(ctx context.Context, pChannels []string, coll *model.Partition) (Timestamp, error) {
deletePartitionChan <- struct{}{}
deletePartitionCalled = true
time.Sleep(confirmGCInterval)
return 0, nil
}
core := newTestCore(withValidProxyManager(), withMeta(meta), withGarbageCollector(gc), withDropIndex())
broker := newMockBroker()
broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool {
return true
}
broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID, partIDs []UniqueID) error {
return nil
}
core := newTestCore(
withValidProxyManager(),
withMeta(meta),
withGarbageCollector(gc),
withBroker(broker))
task := &dropPartitionTask{
baseTask: baseTask{core: core},
......
......@@ -35,7 +35,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/contextutil"
......@@ -43,29 +42,6 @@ import (
"github.com/milvus-io/milvus/internal/util/typeutil"
)
const (
// TimestampPrefix prefix for timestamp
TimestampPrefix = rootcoord.ComponentPrefix + "/timestamp"
// CreateCollectionDDType name of DD type for create collection
CreateCollectionDDType = "CreateCollection"
// DropCollectionDDType name of DD type for drop collection
DropCollectionDDType = "DropCollection"
// CreatePartitionDDType name of DD type for create partition
CreatePartitionDDType = "CreatePartition"
// DropPartitionDDType name of DD type for drop partition
DropPartitionDDType = "DropPartition"
// DefaultIndexType name of default index type for scalar field
DefaultIndexType = "STL_SORT"
// DefaultStringIndexType name of default index type for varChar/string field
DefaultStringIndexType = "Trie"
)
//go:generate mockery --name=IMetaTable --outpkg=mockrootcoord
type IMetaTable interface {
AddCollection(ctx context.Context, coll *model.Collection) error
......
......@@ -765,6 +765,8 @@ type mockBroker struct {
GetSegmentIndexStateFunc func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error)
BroadcastAlteredCollectionFunc func(ctx context.Context, req *milvuspb.AlterCollectionRequest) error
GCConfirmFunc func(ctx context.Context, collectionID, partitionID UniqueID) bool
}
func newMockBroker() *mockBroker {
......@@ -799,6 +801,10 @@ func (b mockBroker) BroadcastAlteredCollection(ctx context.Context, req *milvusp
return b.BroadcastAlteredCollectionFunc(ctx, req)
}
func (b mockBroker) GcConfirm(ctx context.Context, collectionID, partitionID UniqueID) bool {
return b.GCConfirmFunc(ctx, collectionID, partitionID)
}
func withBroker(b Broker) Opt {
return func(c *Core) {
c.broker = b
......
......@@ -3,6 +3,7 @@ package rootcoord
import (
"context"
"fmt"
"time"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
......@@ -14,10 +15,10 @@ import (
type stepPriority int
const (
stepPriorityLow = iota
stepPriorityNormal
stepPriorityImportant
stepPriorityUrgent
stepPriorityLow = 0
stepPriorityNormal = 1
stepPriorityImportant = 10
stepPriorityUrgent = 1000
)
type nestedStep interface {
......@@ -374,3 +375,50 @@ func (b *BroadcastAlteredCollectionStep) Execute(ctx context.Context) ([]nestedS
func (b *BroadcastAlteredCollectionStep) Desc() string {
return fmt.Sprintf("broadcast altered collection, collectionID: %d", b.req.CollectionID)
}
var (
confirmGCInterval = time.Minute * 20
allPartition UniqueID = -1
)
type confirmGCStep struct {
baseStep
collectionID UniqueID
partitionID UniqueID
lastScheduledTime time.Time
}
func newConfirmGCStep(core *Core, collectionID, partitionID UniqueID) *confirmGCStep {
return &confirmGCStep{
baseStep: baseStep{core: core},
collectionID: collectionID,
partitionID: partitionID,
lastScheduledTime: time.Now(),
}
}
func (b *confirmGCStep) Execute(ctx context.Context) ([]nestedStep, error) {
if time.Since(b.lastScheduledTime) < confirmGCInterval {
return nil, fmt.Errorf("wait for reschedule to confirm GC, collection: %d, partition: %d, last scheduled time: %s, now: %s",
b.collectionID, b.partitionID, b.lastScheduledTime.String(), time.Now().String())
}
finished := b.core.broker.GcConfirm(ctx, b.collectionID, b.partitionID)
if finished {
return nil, nil
}
b.lastScheduledTime = time.Now()
return nil, fmt.Errorf("GC is not finished, collection: %d, partition: %d, last scheduled time: %s, now: %s",
b.collectionID, b.partitionID, b.lastScheduledTime.String(), time.Now().String())
}
func (b *confirmGCStep) Desc() string {
return fmt.Sprintf("wait for GC finished, collection: %d, partition: %d, last scheduled time: %s, now: %s",
b.collectionID, b.partitionID, b.lastScheduledTime.String(), time.Now().String())
}
func (b *confirmGCStep) Weight() stepPriority {
return stepPriorityLow
}
......@@ -2,6 +2,7 @@ package rootcoord
import (
"context"
"sort"
"sync"
"time"
......@@ -25,14 +26,26 @@ type stepStack struct {
steps []nestedStep
}
func (s *stepStack) totalPriority() int {
total := 0
for _, step := range s.steps {
total += int(step.Weight())
}
return total
}
func (s *stepStack) Execute(ctx context.Context) *stepStack {
steps := s.steps
for len(steps) > 0 {
l := len(steps)
todo := steps[l-1]
childSteps, err := todo.Execute(ctx)
// TODO: maybe a interface `step.LogOnError` is better.
_, skipLog := todo.(*waitForTsSyncedStep)
_, isWaitForTsSyncedStep := todo.(*waitForTsSyncedStep)
_, isConfirmGCStep := todo.(*confirmGCStep)
skipLog := isWaitForTsSyncedStep || isConfirmGCStep
if retry.IsUnRecoverable(err) {
if !skipLog {
log.Warn("failed to execute step, not able to reschedule", zap.Error(err), zap.String("step", todo.Desc()))
......@@ -76,8 +89,28 @@ func randomSelectPolicy(parallel int) selectStepPolicy {
}
}
func selectByPriority(parallel int, m map[*stepStack]struct{}) []*stepStack {
h := make([]*stepStack, 0, len(m))
for k := range m {
h = append(h, k)
}
sort.Slice(h, func(i, j int) bool {
return h[i].totalPriority() > h[j].totalPriority()
})
if len(h) <= parallel {
return h
}
return h[:parallel]
}
func selectByPriorityPolicy(parallel int) selectStepPolicy {
return func(m map[*stepStack]struct{}) []*stepStack {
return selectByPriority(parallel, m)
}
}
func defaultSelectPolicy() selectStepPolicy {
return randomSelectPolicy(defaultBgExecutingParallel)
return selectByPriorityPolicy(defaultBgExecutingParallel)
}
type bgOpt func(*bgStepExecutor)
......
......@@ -8,7 +8,6 @@ import (
"time"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/stretchr/testify/assert"
)
......@@ -128,7 +127,7 @@ func Test_randomSelect(t *testing.T) {
func Test_bgStepExecutor_scheduleLoop(t *testing.T) {
bg := newBgStepExecutor(context.Background(),
withSelectStepPolicy(defaultSelectPolicy()),
withSelectStepPolicy(randomSelectPolicy(defaultBgExecutingParallel)),
withBgInterval(time.Millisecond*10))
bg.Start()
n := 20
......@@ -173,3 +172,43 @@ func Test_bgStepExecutor_scheduleLoop(t *testing.T) {
}
bg.Stop()
}
func Test_selectByPriorityPolicy(t *testing.T) {
policy := selectByPriorityPolicy(4)
t.Run("select all", func(t *testing.T) {
m := map[*stepStack]struct{}{
{steps: []nestedStep{}}: {},
{steps: []nestedStep{}}: {},
}
selected := policy(m)
assert.Equal(t, 2, len(selected))
})
t.Run("select by priority", func(t *testing.T) {
steps := []nestedStep{
&releaseCollectionStep{},
&releaseCollectionStep{},
&releaseCollectionStep{},
&releaseCollectionStep{},
&releaseCollectionStep{},
}
s1 := &stepStack{steps: steps[0:1]}
s2 := &stepStack{steps: steps[0:2]}
s3 := &stepStack{steps: steps[0:3]}
s4 := &stepStack{steps: steps[0:4]}
s5 := &stepStack{steps: steps[0:5]}
m := map[*stepStack]struct{}{
s1: {},
s2: {},
s3: {},
s4: {},
s5: {},
}
selected := policy(m)
assert.Equal(t, 4, len(selected))
for i := 1; i < len(selected); i++ {
assert.True(t, selected[i].totalPriority() <= selected[i-1].totalPriority())
}
})
}
......@@ -3,6 +3,7 @@ package rootcoord
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
......@@ -27,3 +28,54 @@ func Test_waitForTsSyncedStep_Execute(t *testing.T) {
assert.Equal(t, 0, len(children))
assert.NoError(t, err)
}
func restoreConfirmGCInterval() {
confirmGCInterval = time.Minute * 20
}
func Test_confirmGCStep_Execute(t *testing.T) {
t.Run("wait for reschedule", func(t *testing.T) {
confirmGCInterval = time.Minute * 1000
defer restoreConfirmGCInterval()
s := &confirmGCStep{lastScheduledTime: time.Now()}
_, err := s.Execute(context.TODO())
assert.Error(t, err)
})
t.Run("GC not finished", func(t *testing.T) {
broker := newMockBroker()
broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool {
return false
}
core := newTestCore(withBroker(broker))
confirmGCInterval = time.Millisecond
defer restoreConfirmGCInterval()
s := newConfirmGCStep(core, 100, 1000)
time.Sleep(confirmGCInterval)
_, err := s.Execute(context.TODO())
assert.Error(t, err)
})
t.Run("normal case", func(t *testing.T) {
broker := newMockBroker()
broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool {
return true
}
core := newTestCore(withBroker(broker))
confirmGCInterval = time.Millisecond
defer restoreConfirmGCInterval()
s := newConfirmGCStep(core, 100, 1000)
time.Sleep(confirmGCInterval)
_, err := s.Execute(context.TODO())
assert.NoError(t, err)
})
}
......@@ -331,6 +331,8 @@ type DataCoord interface {
CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)
GcConfirm(ctx context.Context, request *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error)
// CreateIndex create an index on collection.
// Index building is asynchronous, so when an index building request comes, an IndexID is assigned to the task and
// will get all flushed segments from DataCoord and record tasks with these segments. The background process
......
......@@ -19,16 +19,17 @@ package mock
import (
"context"
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/types"
"google.golang.org/grpc"
)
// DataCoordClient mocks of DataCoordClient
type DataCoordClient struct {
types.DataCoord
Err error
}
......
......@@ -36,6 +36,10 @@ type GrpcDataCoordClient struct {
Err error
}
func (m *GrpcDataCoordClient) GcConfirm(ctx context.Context, in *datapb.GcConfirmRequest, opts ...grpc.CallOption) (*datapb.GcConfirmResponse, error) {
return &datapb.GcConfirmResponse{}, m.Err
}
func (m *GrpcDataCoordClient) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
return &milvuspb.CheckHealthResponse{}, m.Err
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册