未验证 提交 a13c375c 编写于 作者: J Jiquan Long 提交者: GitHub

Remove collection meta after GC finished (#21595) (#21671)

Signed-off-by: Nlongjiquan <jiquan.long@zilliz.com>
上级 845719c2
......@@ -299,6 +299,9 @@ mock-datanode:
mock-rootcoord:
mockery --name=RootCoord --dir=$(PWD)/internal/types --output=$(PWD)/internal/mocks --filename=mock_rootcoord.go --with-expecter
mock-datacoord:
mockery --name=DataCoord --dir=$(PWD)/internal/types --output=$(PWD)/internal/mocks --filename=mock_datacoord.go --with-expecter
mock-tnx-kv:
mockery --name=TxnKV --dir=$(PWD)/internal/kv --output=$(PWD)/internal/kv/mocks --filename=TxnKV.go --with-expecter
......
......@@ -1221,6 +1221,10 @@ func (m *meta) DropChannelCheckpoint(vChannel string) error {
return nil
}
func (m *meta) GcConfirm(ctx context.Context, collectionID, partitionID UniqueID) bool {
return m.catalog.GcConfirm(ctx, collectionID, partitionID)
}
// addNewSeg update metrics update for a new segment.
func (s *segMetricMutation) addNewSeg(state commonpb.SegmentState, rowCount int64) {
s.stateChange[state.String()]++
......
......@@ -25,18 +25,18 @@ import (
"testing"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestMetaReloadFromKV(t *testing.T) {
......@@ -964,3 +964,17 @@ func TestChannelCP(t *testing.T) {
assert.NoError(t, err)
})
}
func Test_meta_GcConfirm(t *testing.T) {
m := &meta{}
catalog := mocks.NewDataCoordCatalog(t)
m.catalog = catalog
catalog.On("GcConfirm",
mock.Anything,
mock.AnythingOfType("int64"),
mock.AnythingOfType("int64")).
Return(false)
assert.False(t, m.GcConfirm(context.TODO(), 100, 10000))
}
......@@ -1475,3 +1475,21 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque
return &milvuspb.CheckHealthResponse{IsHealthy: true, Reasons: errReasons}, nil
}
func (s *Server) GcConfirm(ctx context.Context, request *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) {
resp := &datapb.GcConfirmResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
GcFinished: false,
}
if s.isClosed() {
resp.Status.Reason = msgDataCoordIsUnhealthy(Params.DataCoordCfg.GetNodeID())
return resp, nil
}
resp.GcFinished = s.meta.GcConfirm(ctx, request.GetCollectionId(), request.GetPartitionId())
resp.Status.ErrorCode = commonpb.ErrorCode_Success
return resp, nil
}
......@@ -4,10 +4,11 @@ import (
"context"
"testing"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
func TestBroadcastAlteredCollection(t *testing.T) {
......@@ -54,3 +55,35 @@ func TestBroadcastAlteredCollection(t *testing.T) {
assert.NotNil(t, s.meta.collections[1].Properties)
})
}
func TestServer_GcConfirm(t *testing.T) {
t.Run("closed server", func(t *testing.T) {
s := &Server{}
s.stateCode.Store(commonpb.StateCode_Initializing)
resp, err := s.GcConfirm(context.TODO(), &datapb.GcConfirmRequest{CollectionId: 100, PartitionId: 10000})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("normal case", func(t *testing.T) {
s := &Server{}
s.stateCode.Store(commonpb.StateCode_Healthy)
m := &meta{}
catalog := mocks.NewDataCoordCatalog(t)
m.catalog = catalog
catalog.On("GcConfirm",
mock.Anything,
mock.AnythingOfType("int64"),
mock.AnythingOfType("int64")).
Return(false)
s.meta = m
resp, err := s.GcConfirm(context.TODO(), &datapb.GcConfirmRequest{CollectionId: 100, PartitionId: 10000})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
assert.False(t, resp.GetGcFinished())
})
}
......@@ -796,3 +796,16 @@ func (c *Client) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque
}
return ret.(*milvuspb.CheckHealthResponse), err
}
func (c *Client) GcConfirm(ctx context.Context, req *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return client.GcConfirm(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*datapb.GcConfirmResponse), err
}
......@@ -418,3 +418,7 @@ func (s *Server) BroadcastAlteredCollection(ctx context.Context, request *datapb
func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
return s.dataCoord.CheckHealth(ctx, req)
}
func (s *Server) GcConfirm(ctx context.Context, request *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) {
return s.dataCoord.GcConfirm(ctx, request)
}
......@@ -33,6 +33,8 @@ import (
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type MockDataCoord struct {
types.DataCoord
states *milvuspb.ComponentStates
status *commonpb.Status
err error
......
......@@ -620,6 +620,10 @@ func (m *MockDataCoord) CheckHealth(ctx context.Context, req *milvuspb.CheckHeal
return nil, nil
}
func (m *MockDataCoord) GcConfirm(ctx context.Context, req *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) {
return nil, nil
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type MockProxy struct {
MockBase
......
......@@ -82,10 +82,10 @@ type MetaKv_CompareValueAndSwap_Call struct {
}
// CompareValueAndSwap is a helper method to define mock.On call
// - key string
// - value string
// - target string
// - opts ...clientv3.OpOption
// - key string
// - value string
// - target string
// - opts ...clientv3.OpOption
func (_e *MetaKv_Expecter) CompareValueAndSwap(key interface{}, value interface{}, target interface{}, opts ...interface{}) *MetaKv_CompareValueAndSwap_Call {
return &MetaKv_CompareValueAndSwap_Call{Call: _e.mock.On("CompareValueAndSwap",
append([]interface{}{key, value, target}, opts...)...)}
......@@ -143,10 +143,10 @@ type MetaKv_CompareVersionAndSwap_Call struct {
}
// CompareVersionAndSwap is a helper method to define mock.On call
// - key string
// - version int64
// - target string
// - opts ...clientv3.OpOption
// - key string
// - version int64
// - target string
// - opts ...clientv3.OpOption
func (_e *MetaKv_Expecter) CompareVersionAndSwap(key interface{}, version interface{}, target interface{}, opts ...interface{}) *MetaKv_CompareVersionAndSwap_Call {
return &MetaKv_CompareVersionAndSwap_Call{Call: _e.mock.On("CompareVersionAndSwap",
append([]interface{}{key, version, target}, opts...)...)}
......@@ -190,7 +190,7 @@ type MetaKv_GetPath_Call struct {
}
// GetPath is a helper method to define mock.On call
// - key string
// - key string
func (_e *MetaKv_Expecter) GetPath(key interface{}) *MetaKv_GetPath_Call {
return &MetaKv_GetPath_Call{Call: _e.mock.On("GetPath", key)}
}
......@@ -234,7 +234,7 @@ type MetaKv_Grant_Call struct {
}
// Grant is a helper method to define mock.On call
// - ttl int64
// - ttl int64
func (_e *MetaKv_Expecter) Grant(ttl interface{}) *MetaKv_Grant_Call {
return &MetaKv_Grant_Call{Call: _e.mock.On("Grant", ttl)}
}
......@@ -280,7 +280,7 @@ type MetaKv_KeepAlive_Call struct {
}
// KeepAlive is a helper method to define mock.On call
// - id clientv3.LeaseID
// - id clientv3.LeaseID
func (_e *MetaKv_Expecter) KeepAlive(id interface{}) *MetaKv_KeepAlive_Call {
return &MetaKv_KeepAlive_Call{Call: _e.mock.On("KeepAlive", id)}
}
......@@ -324,7 +324,7 @@ type MetaKv_Load_Call struct {
}
// Load is a helper method to define mock.On call
// - key string
// - key string
func (_e *MetaKv_Expecter) Load(key interface{}) *MetaKv_Load_Call {
return &MetaKv_Load_Call{Call: _e.mock.On("Load", key)}
}
......@@ -379,7 +379,7 @@ type MetaKv_LoadWithPrefix_Call struct {
}
// LoadWithPrefix is a helper method to define mock.On call
// - key string
// - key string
func (_e *MetaKv_Expecter) LoadWithPrefix(key interface{}) *MetaKv_LoadWithPrefix_Call {
return &MetaKv_LoadWithPrefix_Call{Call: _e.mock.On("LoadWithPrefix", key)}
}
......@@ -443,7 +443,7 @@ type MetaKv_LoadWithPrefix2_Call struct {
}
// LoadWithPrefix2 is a helper method to define mock.On call
// - key string
// - key string
func (_e *MetaKv_Expecter) LoadWithPrefix2(key interface{}) *MetaKv_LoadWithPrefix2_Call {
return &MetaKv_LoadWithPrefix2_Call{Call: _e.mock.On("LoadWithPrefix2", key)}
}
......@@ -505,7 +505,7 @@ type MetaKv_LoadWithRevision_Call struct {
}
// LoadWithRevision is a helper method to define mock.On call
// - key string
// - key string
func (_e *MetaKv_Expecter) LoadWithRevision(key interface{}) *MetaKv_LoadWithRevision_Call {
return &MetaKv_LoadWithRevision_Call{Call: _e.mock.On("LoadWithRevision", key)}
}
......@@ -576,7 +576,7 @@ type MetaKv_LoadWithRevisionAndVersions_Call struct {
}
// LoadWithRevisionAndVersions is a helper method to define mock.On call
// - key string
// - key string
func (_e *MetaKv_Expecter) LoadWithRevisionAndVersions(key interface{}) *MetaKv_LoadWithRevisionAndVersions_Call {
return &MetaKv_LoadWithRevisionAndVersions_Call{Call: _e.mock.On("LoadWithRevisionAndVersions", key)}
}
......@@ -622,7 +622,7 @@ type MetaKv_MultiLoad_Call struct {
}
// MultiLoad is a helper method to define mock.On call
// - keys []string
// - keys []string
func (_e *MetaKv_Expecter) MultiLoad(keys interface{}) *MetaKv_MultiLoad_Call {
return &MetaKv_MultiLoad_Call{Call: _e.mock.On("MultiLoad", keys)}
}
......@@ -659,7 +659,7 @@ type MetaKv_MultiRemove_Call struct {
}
// MultiRemove is a helper method to define mock.On call
// - keys []string
// - keys []string
func (_e *MetaKv_Expecter) MultiRemove(keys interface{}) *MetaKv_MultiRemove_Call {
return &MetaKv_MultiRemove_Call{Call: _e.mock.On("MultiRemove", keys)}
}
......@@ -696,7 +696,7 @@ type MetaKv_MultiRemoveWithPrefix_Call struct {
}
// MultiRemoveWithPrefix is a helper method to define mock.On call
// - keys []string
// - keys []string
func (_e *MetaKv_Expecter) MultiRemoveWithPrefix(keys interface{}) *MetaKv_MultiRemoveWithPrefix_Call {
return &MetaKv_MultiRemoveWithPrefix_Call{Call: _e.mock.On("MultiRemoveWithPrefix", keys)}
}
......@@ -733,7 +733,7 @@ type MetaKv_MultiSave_Call struct {
}
// MultiSave is a helper method to define mock.On call
// - kvs map[string]string
// - kvs map[string]string
func (_e *MetaKv_Expecter) MultiSave(kvs interface{}) *MetaKv_MultiSave_Call {
return &MetaKv_MultiSave_Call{Call: _e.mock.On("MultiSave", kvs)}
}
......@@ -770,8 +770,8 @@ type MetaKv_MultiSaveAndRemove_Call struct {
}
// MultiSaveAndRemove is a helper method to define mock.On call
// - saves map[string]string
// - removals []string
// - saves map[string]string
// - removals []string
func (_e *MetaKv_Expecter) MultiSaveAndRemove(saves interface{}, removals interface{}) *MetaKv_MultiSaveAndRemove_Call {
return &MetaKv_MultiSaveAndRemove_Call{Call: _e.mock.On("MultiSaveAndRemove", saves, removals)}
}
......@@ -808,8 +808,8 @@ type MetaKv_MultiSaveAndRemoveWithPrefix_Call struct {
}
// MultiSaveAndRemoveWithPrefix is a helper method to define mock.On call
// - saves map[string]string
// - removals []string
// - saves map[string]string
// - removals []string
func (_e *MetaKv_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}) *MetaKv_MultiSaveAndRemoveWithPrefix_Call {
return &MetaKv_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix", saves, removals)}
}
......@@ -846,7 +846,7 @@ type MetaKv_Remove_Call struct {
}
// Remove is a helper method to define mock.On call
// - key string
// - key string
func (_e *MetaKv_Expecter) Remove(key interface{}) *MetaKv_Remove_Call {
return &MetaKv_Remove_Call{Call: _e.mock.On("Remove", key)}
}
......@@ -883,7 +883,7 @@ type MetaKv_RemoveWithPrefix_Call struct {
}
// RemoveWithPrefix is a helper method to define mock.On call
// - key string
// - key string
func (_e *MetaKv_Expecter) RemoveWithPrefix(key interface{}) *MetaKv_RemoveWithPrefix_Call {
return &MetaKv_RemoveWithPrefix_Call{Call: _e.mock.On("RemoveWithPrefix", key)}
}
......@@ -920,8 +920,8 @@ type MetaKv_Save_Call struct {
}
// Save is a helper method to define mock.On call
// - key string
// - value string
// - key string
// - value string
func (_e *MetaKv_Expecter) Save(key interface{}, value interface{}) *MetaKv_Save_Call {
return &MetaKv_Save_Call{Call: _e.mock.On("Save", key, value)}
}
......@@ -958,8 +958,8 @@ type MetaKv_SaveWithIgnoreLease_Call struct {
}
// SaveWithIgnoreLease is a helper method to define mock.On call
// - key string
// - value string
// - key string
// - value string
func (_e *MetaKv_Expecter) SaveWithIgnoreLease(key interface{}, value interface{}) *MetaKv_SaveWithIgnoreLease_Call {
return &MetaKv_SaveWithIgnoreLease_Call{Call: _e.mock.On("SaveWithIgnoreLease", key, value)}
}
......@@ -996,9 +996,9 @@ type MetaKv_SaveWithLease_Call struct {
}
// SaveWithLease is a helper method to define mock.On call
// - key string
// - value string
// - id clientv3.LeaseID
// - key string
// - value string
// - id clientv3.LeaseID
func (_e *MetaKv_Expecter) SaveWithLease(key interface{}, value interface{}, id interface{}) *MetaKv_SaveWithLease_Call {
return &MetaKv_SaveWithLease_Call{Call: _e.mock.On("SaveWithLease", key, value, id)}
}
......@@ -1035,9 +1035,9 @@ type MetaKv_WalkWithPrefix_Call struct {
}
// WalkWithPrefix is a helper method to define mock.On call
// - prefix string
// - paginationSize int
// - fn func([]byte , []byte) error
// - prefix string
// - paginationSize int
// - fn func([]byte , []byte) error
func (_e *MetaKv_Expecter) WalkWithPrefix(prefix interface{}, paginationSize interface{}, fn interface{}) *MetaKv_WalkWithPrefix_Call {
return &MetaKv_WalkWithPrefix_Call{Call: _e.mock.On("WalkWithPrefix", prefix, paginationSize, fn)}
}
......@@ -1076,7 +1076,7 @@ type MetaKv_Watch_Call struct {
}
// Watch is a helper method to define mock.On call
// - key string
// - key string
func (_e *MetaKv_Expecter) Watch(key interface{}) *MetaKv_Watch_Call {
return &MetaKv_Watch_Call{Call: _e.mock.On("Watch", key)}
}
......@@ -1115,7 +1115,7 @@ type MetaKv_WatchWithPrefix_Call struct {
}
// WatchWithPrefix is a helper method to define mock.On call
// - key string
// - key string
func (_e *MetaKv_Expecter) WatchWithPrefix(key interface{}) *MetaKv_WatchWithPrefix_Call {
return &MetaKv_WatchWithPrefix_Call{Call: _e.mock.On("WatchWithPrefix", key)}
}
......@@ -1154,8 +1154,8 @@ type MetaKv_WatchWithRevision_Call struct {
}
// WatchWithRevision is a helper method to define mock.On call
// - key string
// - revision int64
// - key string
// - revision int64
func (_e *MetaKv_Expecter) WatchWithRevision(key interface{}, revision interface{}) *MetaKv_WatchWithRevision_Call {
return &MetaKv_WatchWithRevision_Call{Call: _e.mock.On("WatchWithRevision", key, revision)}
}
......
......@@ -44,8 +44,8 @@ type SnapShotKV_Load_Call struct {
}
// Load is a helper method to define mock.On call
// - key string
// - ts uint64
// - key string
// - ts uint64
func (_e *SnapShotKV_Expecter) Load(key interface{}, ts interface{}) *SnapShotKV_Load_Call {
return &SnapShotKV_Load_Call{Call: _e.mock.On("Load", key, ts)}
}
......@@ -100,8 +100,8 @@ type SnapShotKV_LoadWithPrefix_Call struct {
}
// LoadWithPrefix is a helper method to define mock.On call
// - key string
// - ts uint64
// - key string
// - ts uint64
func (_e *SnapShotKV_Expecter) LoadWithPrefix(key interface{}, ts interface{}) *SnapShotKV_LoadWithPrefix_Call {
return &SnapShotKV_LoadWithPrefix_Call{Call: _e.mock.On("LoadWithPrefix", key, ts)}
}
......@@ -138,8 +138,8 @@ type SnapShotKV_MultiSave_Call struct {
}
// MultiSave is a helper method to define mock.On call
// - kvs map[string]string
// - ts uint64
// - kvs map[string]string
// - ts uint64
func (_e *SnapShotKV_Expecter) MultiSave(kvs interface{}, ts interface{}) *SnapShotKV_MultiSave_Call {
return &SnapShotKV_MultiSave_Call{Call: _e.mock.On("MultiSave", kvs, ts)}
}
......@@ -176,9 +176,9 @@ type SnapShotKV_MultiSaveAndRemoveWithPrefix_Call struct {
}
// MultiSaveAndRemoveWithPrefix is a helper method to define mock.On call
// - saves map[string]string
// - removals []string
// - ts uint64
// - saves map[string]string
// - removals []string
// - ts uint64
func (_e *SnapShotKV_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}, ts interface{}) *SnapShotKV_MultiSaveAndRemoveWithPrefix_Call {
return &SnapShotKV_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix", saves, removals, ts)}
}
......@@ -215,9 +215,9 @@ type SnapShotKV_Save_Call struct {
}
// Save is a helper method to define mock.On call
// - key string
// - value string
// - ts uint64
// - key string
// - value string
// - ts uint64
func (_e *SnapShotKV_Expecter) Save(key interface{}, value interface{}, ts interface{}) *SnapShotKV_Save_Call {
return &SnapShotKV_Save_Call{Call: _e.mock.On("Save", key, value, ts)}
}
......
......@@ -71,7 +71,7 @@ type TxnKV_Load_Call struct {
}
// Load is a helper method to define mock.On call
// - key string
// - key string
func (_e *TxnKV_Expecter) Load(key interface{}) *TxnKV_Load_Call {
return &TxnKV_Load_Call{Call: _e.mock.On("Load", key)}
}
......@@ -126,7 +126,7 @@ type TxnKV_LoadWithPrefix_Call struct {
}
// LoadWithPrefix is a helper method to define mock.On call
// - key string
// - key string
func (_e *TxnKV_Expecter) LoadWithPrefix(key interface{}) *TxnKV_LoadWithPrefix_Call {
return &TxnKV_LoadWithPrefix_Call{Call: _e.mock.On("LoadWithPrefix", key)}
}
......@@ -172,7 +172,7 @@ type TxnKV_MultiLoad_Call struct {
}
// MultiLoad is a helper method to define mock.On call
// - keys []string
// - keys []string
func (_e *TxnKV_Expecter) MultiLoad(keys interface{}) *TxnKV_MultiLoad_Call {
return &TxnKV_MultiLoad_Call{Call: _e.mock.On("MultiLoad", keys)}
}
......@@ -209,7 +209,7 @@ type TxnKV_MultiRemove_Call struct {
}
// MultiRemove is a helper method to define mock.On call
// - keys []string
// - keys []string
func (_e *TxnKV_Expecter) MultiRemove(keys interface{}) *TxnKV_MultiRemove_Call {
return &TxnKV_MultiRemove_Call{Call: _e.mock.On("MultiRemove", keys)}
}
......@@ -246,7 +246,7 @@ type TxnKV_MultiRemoveWithPrefix_Call struct {
}
// MultiRemoveWithPrefix is a helper method to define mock.On call
// - keys []string
// - keys []string
func (_e *TxnKV_Expecter) MultiRemoveWithPrefix(keys interface{}) *TxnKV_MultiRemoveWithPrefix_Call {
return &TxnKV_MultiRemoveWithPrefix_Call{Call: _e.mock.On("MultiRemoveWithPrefix", keys)}
}
......@@ -283,7 +283,7 @@ type TxnKV_MultiSave_Call struct {
}
// MultiSave is a helper method to define mock.On call
// - kvs map[string]string
// - kvs map[string]string
func (_e *TxnKV_Expecter) MultiSave(kvs interface{}) *TxnKV_MultiSave_Call {
return &TxnKV_MultiSave_Call{Call: _e.mock.On("MultiSave", kvs)}
}
......@@ -320,8 +320,8 @@ type TxnKV_MultiSaveAndRemove_Call struct {
}
// MultiSaveAndRemove is a helper method to define mock.On call
// - saves map[string]string
// - removals []string
// - saves map[string]string
// - removals []string
func (_e *TxnKV_Expecter) MultiSaveAndRemove(saves interface{}, removals interface{}) *TxnKV_MultiSaveAndRemove_Call {
return &TxnKV_MultiSaveAndRemove_Call{Call: _e.mock.On("MultiSaveAndRemove", saves, removals)}
}
......@@ -358,8 +358,8 @@ type TxnKV_MultiSaveAndRemoveWithPrefix_Call struct {
}
// MultiSaveAndRemoveWithPrefix is a helper method to define mock.On call
// - saves map[string]string
// - removals []string
// - saves map[string]string
// - removals []string
func (_e *TxnKV_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}) *TxnKV_MultiSaveAndRemoveWithPrefix_Call {
return &TxnKV_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix", saves, removals)}
}
......@@ -396,7 +396,7 @@ type TxnKV_Remove_Call struct {
}
// Remove is a helper method to define mock.On call
// - key string
// - key string
func (_e *TxnKV_Expecter) Remove(key interface{}) *TxnKV_Remove_Call {
return &TxnKV_Remove_Call{Call: _e.mock.On("Remove", key)}
}
......@@ -433,7 +433,7 @@ type TxnKV_RemoveWithPrefix_Call struct {
}
// RemoveWithPrefix is a helper method to define mock.On call
// - key string
// - key string
func (_e *TxnKV_Expecter) RemoveWithPrefix(key interface{}) *TxnKV_RemoveWithPrefix_Call {
return &TxnKV_RemoveWithPrefix_Call{Call: _e.mock.On("RemoveWithPrefix", key)}
}
......@@ -470,8 +470,8 @@ type TxnKV_Save_Call struct {
}
// Save is a helper method to define mock.On call
// - key string
// - value string
// - key string
// - value string
func (_e *TxnKV_Expecter) Save(key interface{}, value interface{}) *TxnKV_Save_Call {
return &TxnKV_Save_Call{Call: _e.mock.On("Save", key, value)}
}
......
......@@ -118,6 +118,8 @@ type DataCoordCatalog interface {
ListChannelCheckpoint(ctx context.Context) (map[string]*internalpb.MsgPosition, error)
SaveChannelCheckpoint(ctx context.Context, vChannel string, pos *internalpb.MsgPosition) error
DropChannelCheckpoint(ctx context.Context, vChannel string) error
GcConfirm(ctx context.Context, collectionID, partitionID typeutil.UniqueID) bool
}
type IndexCoordCatalog interface {
......
......@@ -23,11 +23,9 @@ import (
"strconv"
"strings"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
......@@ -38,6 +36,8 @@ import (
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/metautil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
var maxEtcdTxnNum = 128
......@@ -538,6 +538,23 @@ func (kc *Catalog) unmarshalBinlog(binlogType storage.BinlogType, collectionID,
return result, nil
}
const allPartitionID = -1
// GcConfirm returns true if related collection/partition is not found.
// DataCoord will remove all the meta eventually after GC is finished.
func (kc *Catalog) GcConfirm(ctx context.Context, collectionID, partitionID typeutil.UniqueID) bool {
prefix := buildCollectionPrefix(collectionID)
if partitionID != allPartitionID {
prefix = buildPartitionPrefix(collectionID, partitionID)
}
keys, values, err := kc.MetaKv.LoadWithPrefix(prefix)
if err != nil {
// error case can be regarded as not finished.
return false
}
return len(keys) == 0 && len(values) == 0
}
func fillLogPathByLogID(chunkManagerRootPath string, binlogType storage.BinlogType, collectionID, partitionID,
segmentID typeutil.UniqueID, fieldBinlog *datapb.FieldBinlog) error {
for _, binlog := range fieldBinlog.Binlogs {
......@@ -796,3 +813,11 @@ func buildChannelRemovePath(channel string) string {
func buildChannelCPKey(vChannel string) string {
return fmt.Sprintf("%s/%s", ChannelCheckpointPrefix, vChannel)
}
func buildCollectionPrefix(collectionID typeutil.UniqueID) string {
return fmt.Sprintf("%s/%d", SegmentPrefix, collectionID)
}
func buildPartitionPrefix(collectionID, partitionID typeutil.UniqueID) string {
return fmt.Sprintf("%s/%d/%d", SegmentPrefix, collectionID, partitionID)
}
......@@ -791,3 +791,20 @@ func verifySavedKvsForDroppedSegment(t *testing.T, savedKvs map[string]string) {
assert.True(t, ok)
verifySegmentInfo2(t, []byte(ret))
}
func TestCatalog_GcConfirm(t *testing.T) {
kc := &Catalog{}
txn := mocks.NewMetaKv(t)
kc.MetaKv = txn
txn.On("LoadWithPrefix",
mock.AnythingOfType("string")).
Return(nil, nil, errors.New("error mock LoadWithPrefix")).
Once()
assert.False(t, kc.GcConfirm(context.TODO(), 100, 10000))
txn.On("LoadWithPrefix",
mock.AnythingOfType("string")).
Return(nil, nil, nil)
assert.True(t, kc.GcConfirm(context.TODO(), 100, 10000))
}
// Code generated by mockery v2.14.0. DO NOT EDIT.
// Code generated by mockery v2.16.0. DO NOT EDIT.
package mocks
......
......@@ -311,6 +311,53 @@ func (_c *DataCoord_Flush_Call) Return(_a0 *datapb.FlushResponse, _a1 error) *Da
return _c
}
// GcConfirm provides a mock function with given fields: ctx, request
func (_m *DataCoord) GcConfirm(ctx context.Context, request *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) {
ret := _m.Called(ctx, request)
var r0 *datapb.GcConfirmResponse
if rf, ok := ret.Get(0).(func(context.Context, *datapb.GcConfirmRequest) *datapb.GcConfirmResponse); ok {
r0 = rf(ctx, request)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*datapb.GcConfirmResponse)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *datapb.GcConfirmRequest) error); ok {
r1 = rf(ctx, request)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// DataCoord_GcConfirm_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GcConfirm'
type DataCoord_GcConfirm_Call struct {
*mock.Call
}
// GcConfirm is a helper method to define mock.On call
// - ctx context.Context
// - request *datapb.GcConfirmRequest
func (_e *DataCoord_Expecter) GcConfirm(ctx interface{}, request interface{}) *DataCoord_GcConfirm_Call {
return &DataCoord_GcConfirm_Call{Call: _e.mock.On("GcConfirm", ctx, request)}
}
func (_c *DataCoord_GcConfirm_Call) Run(run func(ctx context.Context, request *datapb.GcConfirmRequest)) *DataCoord_GcConfirm_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*datapb.GcConfirmRequest))
})
return _c
}
func (_c *DataCoord_GcConfirm_Call) Return(_a0 *datapb.GcConfirmResponse, _a1 error) *DataCoord_GcConfirm_Call {
_c.Call.Return(_a0, _a1)
return _c
}
// GetCollectionStatistics provides a mock function with given fields: ctx, req
func (_m *DataCoord) GetCollectionStatistics(ctx context.Context, req *datapb.GetCollectionStatisticsRequest) (*datapb.GetCollectionStatisticsResponse, error) {
ret := _m.Called(ctx, req)
......
此差异已折叠。
......@@ -69,6 +69,8 @@ service DataCoord {
rpc BroadcastAlteredCollection(AlterCollectionRequest) returns (common.Status) {}
rpc CheckHealth(milvus.CheckHealthRequest) returns (milvus.CheckHealthResponse) {}
rpc GcConfirm(GcConfirmRequest) returns (GcConfirmResponse) {}
}
service DataNode {
......@@ -645,3 +647,13 @@ message AlterCollectionRequest {
repeated common.KeyDataPair start_positions = 4;
repeated common.KeyValuePair properties = 5;
}
message GcConfirmRequest {
int64 collection_id = 1;
int64 partition_id = 2; // -1 means whole collection.
}
message GcConfirmResponse {
common.Status status = 1;
bool gc_finished = 2;
}
\ No newline at end of file
......@@ -24,12 +24,15 @@ import (
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/internal/util/uniquegenerator"
)
type DataCoordMock struct {
types.DataCoord
nodeID typeutil.UniqueID
address string
......
......@@ -45,6 +45,7 @@ type Broker interface {
UnsetIsImportingState(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error)
MarkSegmentsDropped(context.Context, *datapb.MarkSegmentsDroppedRequest) (*commonpb.Status, error)
GetSegmentStates(context.Context, *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error)
GcConfirm(ctx context.Context, collectionID, partitionID UniqueID) bool
DropCollectionIndex(ctx context.Context, collID UniqueID, partIDs []UniqueID) error
GetSegmentIndexState(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error)
......@@ -282,3 +283,15 @@ func (b *ServerBroker) DescribeIndex(ctx context.Context, colID UniqueID) (*inde
CollectionID: colID,
})
}
func (b *ServerBroker) GcConfirm(ctx context.Context, collectionID, partitionID UniqueID) bool {
req := &datapb.GcConfirmRequest{CollectionId: collectionID, PartitionId: partitionID}
resp, err := b.s.dataCoord.GcConfirm(ctx, req)
if err != nil {
return false
}
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return false
}
return resp.GetGcFinished()
}
......@@ -5,15 +5,15 @@ import (
"errors"
"testing"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
func TestServerBroker_ReleaseCollection(t *testing.T) {
......@@ -356,3 +356,42 @@ func TestServerBroker_BroadcastAlteredCollection(t *testing.T) {
assert.NoError(t, err)
})
}
func TestServerBroker_GcConfirm(t *testing.T) {
t.Run("invalid datacoord", func(t *testing.T) {
dc := mocks.NewDataCoord(t)
dc.On("GcConfirm",
mock.Anything, // context.Context
mock.Anything, // *datapb.GcConfirmRequest
).Return(nil, errors.New("error mock GcConfirm"))
c := newTestCore(withDataCoord(dc))
broker := newServerBroker(c)
assert.False(t, broker.GcConfirm(context.Background(), 100, 10000))
})
t.Run("non success", func(t *testing.T) {
dc := mocks.NewDataCoord(t)
dc.On("GcConfirm",
mock.Anything, // context.Context
mock.Anything, // *datapb.GcConfirmRequest
).Return(
&datapb.GcConfirmResponse{Status: failStatus(commonpb.ErrorCode_UnexpectedError, "error mock GcConfirm")},
nil)
c := newTestCore(withDataCoord(dc))
broker := newServerBroker(c)
assert.False(t, broker.GcConfirm(context.Background(), 100, 10000))
})
t.Run("normal case", func(t *testing.T) {
dc := mocks.NewDataCoord(t)
dc.On("GcConfirm",
mock.Anything, // context.Context
mock.Anything, // *datapb.GcConfirmRequest
).Return(
&datapb.GcConfirmResponse{Status: succStatus(), GcFinished: true},
nil)
c := newTestCore(withDataCoord(dc))
broker := newServerBroker(c)
assert.True(t, broker.GcConfirm(context.Background(), 100, 10000))
})
}
......@@ -91,6 +91,7 @@ func (t *dropCollectionTask) Execute(ctx context.Context) error {
baseStep: baseStep{core: t.core},
pChannels: collMeta.PhysicalChannelNames,
})
redoTask.AddAsyncStep(newConfirmGCStep(t.core, collMeta.CollectionID, allPartition))
redoTask.AddAsyncStep(&deleteCollectionMetaStep{
baseStep: baseStep{core: t.core},
collectionID: collMeta.CollectionID,
......
......@@ -4,6 +4,7 @@ import (
"context"
"errors"
"testing"
"time"
"github.com/milvus-io/milvus/internal/common"
......@@ -148,6 +149,9 @@ func Test_dropCollectionTask_Execute(t *testing.T) {
t.Run("normal case, redo", func(t *testing.T) {
defer cleanTestEnv()
confirmGCInterval = time.Millisecond
defer restoreConfirmGCInterval()
collectionName := funcutil.GenRandomStr()
shardNum := 2
......@@ -187,8 +191,12 @@ func Test_dropCollectionTask_Execute(t *testing.T) {
broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID, partIDs []UniqueID) error {
dropIndexCalled = true
dropIndexChan <- struct{}{}
time.Sleep(confirmGCInterval)
return nil
}
broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool {
return true
}
gc := newMockGarbageCollector()
deleteCollectionCalled := false
......
......@@ -85,6 +85,7 @@ func (t *dropPartitionTask) Execute(ctx context.Context) error {
CollectionID: t.collMeta.CollectionID,
},
})
redoTask.AddAsyncStep(newConfirmGCStep(t.core, t.collMeta.CollectionID, partID))
redoTask.AddAsyncStep(&removePartitionMetaStep{
baseStep: baseStep{core: t.core},
collectionID: t.collMeta.CollectionID,
......
......@@ -3,6 +3,7 @@ package rootcoord
import (
"context"
"testing"
"time"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
......@@ -125,6 +126,9 @@ func Test_dropPartitionTask_Execute(t *testing.T) {
})
t.Run("normal case", func(t *testing.T) {
confirmGCInterval = time.Millisecond
defer restoreConfirmGCInterval()
collectionName := funcutil.GenRandomStr()
partitionName := funcutil.GenRandomStr()
coll := &model.Collection{Name: collectionName, Partitions: []*model.Partition{{PartitionName: partitionName}}}
......@@ -146,10 +150,23 @@ func Test_dropPartitionTask_Execute(t *testing.T) {
gc.GcPartitionDataFunc = func(ctx context.Context, pChannels []string, coll *model.Partition) (Timestamp, error) {
deletePartitionChan <- struct{}{}
deletePartitionCalled = true
time.Sleep(confirmGCInterval)
return 0, nil
}
core := newTestCore(withValidProxyManager(), withMeta(meta), withGarbageCollector(gc), withDropIndex())
broker := newMockBroker()
broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool {
return true
}
broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID, partIDs []UniqueID) error {
return nil
}
core := newTestCore(
withValidProxyManager(),
withMeta(meta),
withGarbageCollector(gc),
withBroker(broker))
task := &dropPartitionTask{
baseTask: baseTask{core: core},
......
......@@ -28,7 +28,6 @@ import (
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/internalpb"
......@@ -38,29 +37,6 @@ import (
"go.uber.org/zap"
)
const (
// TimestampPrefix prefix for timestamp
TimestampPrefix = rootcoord.ComponentPrefix + "/timestamp"
// CreateCollectionDDType name of DD type for create collection
CreateCollectionDDType = "CreateCollection"
// DropCollectionDDType name of DD type for drop collection
DropCollectionDDType = "DropCollection"
// CreatePartitionDDType name of DD type for create partition
CreatePartitionDDType = "CreatePartition"
// DropPartitionDDType name of DD type for drop partition
DropPartitionDDType = "DropPartition"
// DefaultIndexType name of default index type for scalar field
DefaultIndexType = "STL_SORT"
// DefaultStringIndexType name of default index type for varChar/string field
DefaultStringIndexType = "Trie"
)
//go:generate mockery --name=IMetaTable --outpkg=mockrootcoord
type IMetaTable interface {
AddCollection(ctx context.Context, coll *model.Collection) error
......
......@@ -827,6 +827,8 @@ type mockBroker struct {
GetSegmentIndexStateFunc func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error)
BroadcastAlteredCollectionFunc func(ctx context.Context, req *milvuspb.AlterCollectionRequest) error
GCConfirmFunc func(ctx context.Context, collectionID, partitionID UniqueID) bool
}
func newMockBroker() *mockBroker {
......@@ -861,6 +863,10 @@ func (b mockBroker) BroadcastAlteredCollection(ctx context.Context, req *milvusp
return b.BroadcastAlteredCollectionFunc(ctx, req)
}
func (b mockBroker) GcConfirm(ctx context.Context, collectionID, partitionID UniqueID) bool {
return b.GCConfirmFunc(ctx, collectionID, partitionID)
}
func withBroker(b Broker) Opt {
return func(c *Core) {
c.broker = b
......
......@@ -3,6 +3,7 @@ package rootcoord
import (
"context"
"fmt"
"time"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
......@@ -14,10 +15,10 @@ import (
type stepPriority int
const (
stepPriorityLow = iota
stepPriorityNormal
stepPriorityImportant
stepPriorityUrgent
stepPriorityLow = 0
stepPriorityNormal = 1
stepPriorityImportant = 10
stepPriorityUrgent = 1000
)
type nestedStep interface {
......@@ -374,3 +375,50 @@ func (b *BroadcastAlteredCollectionStep) Execute(ctx context.Context) ([]nestedS
func (b *BroadcastAlteredCollectionStep) Desc() string {
return fmt.Sprintf("broadcast altered collection, collectionID: %d", b.req.CollectionID)
}
var (
confirmGCInterval = time.Minute * 20
allPartition UniqueID = -1
)
type confirmGCStep struct {
baseStep
collectionID UniqueID
partitionID UniqueID
lastScheduledTime time.Time
}
func newConfirmGCStep(core *Core, collectionID, partitionID UniqueID) *confirmGCStep {
return &confirmGCStep{
baseStep: baseStep{core: core},
collectionID: collectionID,
partitionID: partitionID,
lastScheduledTime: time.Now(),
}
}
func (b *confirmGCStep) Execute(ctx context.Context) ([]nestedStep, error) {
if time.Since(b.lastScheduledTime) < confirmGCInterval {
return nil, fmt.Errorf("wait for reschedule to confirm GC, collection: %d, partition: %d, last scheduled time: %s, now: %s",
b.collectionID, b.partitionID, b.lastScheduledTime.String(), time.Now().String())
}
finished := b.core.broker.GcConfirm(ctx, b.collectionID, b.partitionID)
if finished {
return nil, nil
}
b.lastScheduledTime = time.Now()
return nil, fmt.Errorf("GC is not finished, collection: %d, partition: %d, last scheduled time: %s, now: %s",
b.collectionID, b.partitionID, b.lastScheduledTime.String(), time.Now().String())
}
func (b *confirmGCStep) Desc() string {
return fmt.Sprintf("wait for GC finished, collection: %d, partition: %d, last scheduled time: %s, now: %s",
b.collectionID, b.partitionID, b.lastScheduledTime.String(), time.Now().String())
}
func (b *confirmGCStep) Weight() stepPriority {
return stepPriorityLow
}
......@@ -2,6 +2,7 @@ package rootcoord
import (
"context"
"sort"
"sync"
"time"
......@@ -25,14 +26,26 @@ type stepStack struct {
steps []nestedStep
}
func (s *stepStack) totalPriority() int {
total := 0
for _, step := range s.steps {
total += int(step.Weight())
}
return total
}
func (s *stepStack) Execute(ctx context.Context) *stepStack {
steps := s.steps
for len(steps) > 0 {
l := len(steps)
todo := steps[l-1]
childSteps, err := todo.Execute(ctx)
// TODO: maybe a interface `step.LogOnError` is better.
_, skipLog := todo.(*waitForTsSyncedStep)
_, isWaitForTsSyncedStep := todo.(*waitForTsSyncedStep)
_, isConfirmGCStep := todo.(*confirmGCStep)
skipLog := isWaitForTsSyncedStep || isConfirmGCStep
if retry.IsUnRecoverable(err) {
if !skipLog {
log.Warn("failed to execute step, not able to reschedule", zap.Error(err), zap.String("step", todo.Desc()))
......@@ -76,8 +89,28 @@ func randomSelectPolicy(parallel int) selectStepPolicy {
}
}
func selectByPriority(parallel int, m map[*stepStack]struct{}) []*stepStack {
h := make([]*stepStack, 0, len(m))
for k := range m {
h = append(h, k)
}
sort.Slice(h, func(i, j int) bool {
return h[i].totalPriority() > h[j].totalPriority()
})
if len(h) <= parallel {
return h
}
return h[:parallel]
}
func selectByPriorityPolicy(parallel int) selectStepPolicy {
return func(m map[*stepStack]struct{}) []*stepStack {
return selectByPriority(parallel, m)
}
}
func defaultSelectPolicy() selectStepPolicy {
return randomSelectPolicy(defaultBgExecutingParallel)
return selectByPriorityPolicy(defaultBgExecutingParallel)
}
type bgOpt func(*bgStepExecutor)
......
......@@ -8,7 +8,6 @@ import (
"time"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/stretchr/testify/assert"
)
......@@ -128,7 +127,7 @@ func Test_randomSelect(t *testing.T) {
func Test_bgStepExecutor_scheduleLoop(t *testing.T) {
bg := newBgStepExecutor(context.Background(),
withSelectStepPolicy(defaultSelectPolicy()),
withSelectStepPolicy(randomSelectPolicy(defaultBgExecutingParallel)),
withBgInterval(time.Millisecond*10))
bg.Start()
n := 20
......@@ -173,3 +172,43 @@ func Test_bgStepExecutor_scheduleLoop(t *testing.T) {
}
bg.Stop()
}
func Test_selectByPriorityPolicy(t *testing.T) {
policy := selectByPriorityPolicy(4)
t.Run("select all", func(t *testing.T) {
m := map[*stepStack]struct{}{
{steps: []nestedStep{}}: {},
{steps: []nestedStep{}}: {},
}
selected := policy(m)
assert.Equal(t, 2, len(selected))
})
t.Run("select by priority", func(t *testing.T) {
steps := []nestedStep{
&releaseCollectionStep{},
&releaseCollectionStep{},
&releaseCollectionStep{},
&releaseCollectionStep{},
&releaseCollectionStep{},
}
s1 := &stepStack{steps: steps[0:1]}
s2 := &stepStack{steps: steps[0:2]}
s3 := &stepStack{steps: steps[0:3]}
s4 := &stepStack{steps: steps[0:4]}
s5 := &stepStack{steps: steps[0:5]}
m := map[*stepStack]struct{}{
s1: {},
s2: {},
s3: {},
s4: {},
s5: {},
}
selected := policy(m)
assert.Equal(t, 4, len(selected))
for i := 1; i < len(selected); i++ {
assert.True(t, selected[i].totalPriority() <= selected[i-1].totalPriority())
}
})
}
......@@ -3,6 +3,7 @@ package rootcoord
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
......@@ -27,3 +28,54 @@ func Test_waitForTsSyncedStep_Execute(t *testing.T) {
assert.Equal(t, 0, len(children))
assert.NoError(t, err)
}
func restoreConfirmGCInterval() {
confirmGCInterval = time.Minute * 20
}
func Test_confirmGCStep_Execute(t *testing.T) {
t.Run("wait for reschedule", func(t *testing.T) {
confirmGCInterval = time.Minute * 1000
defer restoreConfirmGCInterval()
s := &confirmGCStep{lastScheduledTime: time.Now()}
_, err := s.Execute(context.TODO())
assert.Error(t, err)
})
t.Run("GC not finished", func(t *testing.T) {
broker := newMockBroker()
broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool {
return false
}
core := newTestCore(withBroker(broker))
confirmGCInterval = time.Millisecond
defer restoreConfirmGCInterval()
s := newConfirmGCStep(core, 100, 1000)
time.Sleep(confirmGCInterval)
_, err := s.Execute(context.TODO())
assert.Error(t, err)
})
t.Run("normal case", func(t *testing.T) {
broker := newMockBroker()
broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool {
return true
}
core := newTestCore(withBroker(broker))
confirmGCInterval = time.Millisecond
defer restoreConfirmGCInterval()
s := newConfirmGCStep(core, 100, 1000)
time.Sleep(confirmGCInterval)
_, err := s.Execute(context.TODO())
assert.NoError(t, err)
})
}
......@@ -330,6 +330,8 @@ type DataCoord interface {
BroadcastAlteredCollection(ctx context.Context, req *datapb.AlterCollectionRequest) (*commonpb.Status, error)
CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)
GcConfirm(ctx context.Context, request *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error)
}
// DataCoordComponent defines the interface of DataCoord component.
......
......@@ -19,16 +19,17 @@ package mock
import (
"context"
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/types"
"google.golang.org/grpc"
)
// DataCoordClient mocks of DataCoordClient
type DataCoordClient struct {
types.DataCoord
Err error
}
......
......@@ -34,6 +34,10 @@ type GrpcDataCoordClient struct {
Err error
}
func (m *GrpcDataCoordClient) GcConfirm(ctx context.Context, in *datapb.GcConfirmRequest, opts ...grpc.CallOption) (*datapb.GcConfirmResponse, error) {
return &datapb.GcConfirmResponse{}, m.Err
}
func (m *GrpcDataCoordClient) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
return &milvuspb.CheckHealthResponse{}, m.Err
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册