未验证 提交 051d8027 编写于 作者: Z zhenshan.cao 提交者: GitHub

Revert "Restrain gc files by collection (#22504) (#22518)" (#23382)

This reverts commit a58abe16.
Signed-off-by: Nzhenshan.cao <zhenshan.cao@zilliz.com>
上级 9d5d54ff
......@@ -19,7 +19,6 @@ package datacoord
import (
"context"
"path"
"strconv"
"strings"
"sync"
"time"
......@@ -42,8 +41,6 @@ const (
deltaLogPrefix = `delta_log`
)
type collectionValidator func(int64) bool
// GcOption garbage collection options
type GcOption struct {
cli storage.ChunkManager // client
......@@ -51,7 +48,6 @@ type GcOption struct {
checkInterval time.Duration // each interval
missingTolerance time.Duration // key missing in meta tolerance time
dropTolerance time.Duration // dropped segment related key tolerance time
collValidator collectionValidator // validates collection id
}
// garbageCollector handles garbage files in object storage
......@@ -113,24 +109,6 @@ func (gc *garbageCollector) work() {
}
}
func (gc *garbageCollector) isCollectionPrefixValid(p string, prefix string) bool {
if gc.option.collValidator == nil {
return true
}
if !strings.HasPrefix(p, prefix) {
return false
}
p = strings.Trim(p[len(prefix):], "/")
collectionID, err := strconv.ParseInt(p, 10, 64)
if err != nil {
return false
}
return gc.option.collValidator(collectionID)
}
func (gc *garbageCollector) close() {
gc.stopOnce.Do(func() {
close(gc.closeCh)
......@@ -168,69 +146,51 @@ func (gc *garbageCollector) scan() {
var removedKeys []string
for _, prefix := range prefixes {
// list first level prefix, then perform collection id validation
collectionPrefixes, _, err := gc.option.cli.ListWithPrefix(ctx, prefix+"/", false)
infoKeys, modTimes, err := gc.option.cli.ListWithPrefix(ctx, prefix, true)
if err != nil {
log.Warn("failed to list collection prefix",
log.Error("failed to list files with prefix",
zap.String("prefix", prefix),
zap.Error(err),
zap.String("error", err.Error()),
)
}
for _, collPrefix := range collectionPrefixes {
if !gc.isCollectionPrefixValid(collPrefix, prefix) {
log.Warn("garbage collector meet invalid collection prefix, ignore it",
zap.String("collPrefix", collPrefix),
zap.String("prefix", prefix),
)
for i, infoKey := range infoKeys {
total++
_, has := filesMap[infoKey]
if has {
valid++
continue
}
infoKeys, modTimes, err := gc.option.cli.ListWithPrefix(ctx, collPrefix, true)
segmentID, err := storage.ParseSegmentIDByBinlog(gc.option.cli.RootPath(), infoKey)
if err != nil {
log.Error("failed to list files with collPrefix",
zap.String("collPrefix", collPrefix),
zap.String("error", err.Error()),
)
missing++
log.Warn("parse segment id error",
zap.String("infoKey", infoKey),
zap.Error(err))
continue
}
for i, infoKey := range infoKeys {
total++
_, has := filesMap[infoKey]
if has {
valid++
continue
}
if gc.segRefer.HasSegmentLock(segmentID) {
valid++
continue
}
if strings.Contains(prefix, statsLogPrefix) &&
segmentMap.Contain(segmentID) {
valid++
continue
}
segmentID, err := storage.ParseSegmentIDByBinlog(gc.option.cli.RootPath(), infoKey)
// not found in meta, check last modified time exceeds tolerance duration
if time.Since(modTimes[i]) > gc.option.missingTolerance {
// ignore error since it could be cleaned up next time
removedKeys = append(removedKeys, infoKey)
err = gc.option.cli.Remove(ctx, infoKey)
if err != nil {
missing++
log.Warn("parse segment id error",
log.Error("failed to remove object",
zap.String("infoKey", infoKey),
zap.Error(err))
continue
}
if gc.segRefer.HasSegmentLock(segmentID) {
valid++
continue
}
if strings.Contains(prefix, statsLogPrefix) &&
segmentMap.Contain(segmentID) {
valid++
continue
}
// not found in meta, check last modified time exceeds tolerance duration
if time.Since(modTimes[i]) > gc.option.missingTolerance {
// ignore error since it could be cleaned up next time
removedKeys = append(removedKeys, infoKey)
err = gc.option.cli.Remove(ctx, infoKey)
if err != nil {
missing++
log.Error("failed to remove object",
zap.String("infoKey", infoKey),
zap.Error(err))
}
}
}
}
......
......@@ -19,7 +19,6 @@ package datacoord
import (
"bytes"
"context"
"errors"
"path"
"strconv"
"strings"
......@@ -28,7 +27,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/commonpb"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
kvmocks "github.com/milvus-io/milvus/internal/kv/mocks"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
......@@ -36,170 +34,10 @@ import (
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
type GarbageCollectorSuite struct {
suite.Suite
mockChunkManager *mocks.ChunkManager
gc *garbageCollector
}
func (s *GarbageCollectorSuite) SetupTest() {
meta, err := newMemoryMeta()
s.Require().NoError(err)
s.mockChunkManager = &mocks.ChunkManager{}
mockKV := &kvmocks.TxnKV{}
mockKV.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{}, []string{}, nil)
segRefer, err := NewSegmentReferenceManager(mockKV, nil)
s.Require().NoError(err)
s.Require().NotNil(segRefer)
s.gc = newGarbageCollector(
meta, newMockHandler(), segRefer, &mocks.MockIndexCoord{}, GcOption{
cli: s.mockChunkManager,
enabled: true,
checkInterval: time.Millisecond * 10,
missingTolerance: time.Hour * 24,
dropTolerance: time.Hour * 24,
},
)
}
func (s *GarbageCollectorSuite) TearDownTest() {
s.mockChunkManager = nil
s.gc.close()
s.gc = nil
}
func (s *GarbageCollectorSuite) TestBasicOperation() {
s.Run("normal_gc", func() {
gc := s.gc
s.mockChunkManager.EXPECT().RootPath().Return("files")
s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, mock.AnythingOfType("string"), mock.AnythingOfType("bool")).
Return([]string{}, []time.Time{}, nil)
gc.start()
// make ticker run at least once
time.Sleep(time.Millisecond * 20)
s.NotPanics(func() {
gc.close()
})
})
s.Run("nil_client", func() {
// initial a new garbageCollector here
mockKV := &kvmocks.TxnKV{}
mockKV.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{}, []string{}, nil)
segRefer, err := NewSegmentReferenceManager(mockKV, nil)
s.Require().NoError(err)
gc := newGarbageCollector(nil, newMockHandler(), segRefer, &mocks.MockIndexCoord{}, GcOption{
cli: nil,
enabled: true,
})
s.NotPanics(func() {
gc.start()
})
s.NotPanics(func() {
gc.close()
})
})
}
func (s *GarbageCollectorSuite) TestScan() {
s.Run("listCollectionPrefix_fails", func() {
s.mockChunkManager.ExpectedCalls = nil
s.mockChunkManager.EXPECT().RootPath().Return("files")
s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, mock.AnythingOfType("string"), mock.AnythingOfType("bool")).
Return(nil, nil, errors.New("mocked"))
s.gc.scan()
s.mockChunkManager.AssertNotCalled(s.T(), "Remove", mock.Anything, mock.Anything)
})
s.Run("collectionPrefix_invalid", func() {
s.mockChunkManager.ExpectedCalls = nil
s.mockChunkManager.EXPECT().RootPath().Return("files")
/*
s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, mock.AnythingOfType("string"), mock.AnythingOfType("bool")).
Return([]string{"files/insert_log/1/", "files/bad_prefix", "files/insert_log/string/"}, lo.RepeatBy(3, func(_ int) time.Time {
return time.Now().Add(-time.Hour)
}), nil)*/
logTypes := []string{"files/insert_log/", "files/stats_log/", "files/delta_log/"}
for _, logType := range logTypes {
validSubPath := "1/2/3/100/2000"
if logType == "files/delta_log/" {
validSubPath = "1/2/3/2000"
}
s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, logType, false).
Return([]string{path.Join(logType, "1") + "/", path.Join(logType, "2") + "/", path.Join(logType, "string") + "/", "files/badprefix/"}, lo.RepeatBy(4, func(_ int) time.Time { return time.Now() }), nil)
s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, path.Join(logType, "1")+"/", true).
Return([]string{path.Join(logType, validSubPath)}, []time.Time{time.Now().Add(time.Hour * -48)}, nil)
s.mockChunkManager.EXPECT().Remove(mock.Anything, path.Join(logType, validSubPath)).Return(nil)
}
s.gc.option.collValidator = func(collID int64) bool {
return collID == 1
}
s.gc.scan()
//s.mockChunkManager.AssertNotCalled(s.T(), "Remove", mock.Anything, mock.Anything)
s.mockChunkManager.AssertExpectations(s.T())
})
s.Run("fileScan_fails", func() {
s.mockChunkManager.ExpectedCalls = nil
s.mockChunkManager.Calls = nil
s.mockChunkManager.EXPECT().RootPath().Return("files")
isCollPrefix := func(prefix string) bool {
return lo.Contains([]string{"files/insert_log/", "files/stats_log/", "files/delta_log/"}, prefix)
}
s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, mock.AnythingOfType("string"), mock.AnythingOfType("bool")).Call.Return(
func(_ context.Context, prefix string, recursive bool) []string {
if isCollPrefix(prefix) {
return []string{path.Join(prefix, "1")}
}
return nil
},
func(_ context.Context, prefix string, recursive bool) []time.Time {
if isCollPrefix(prefix) {
return []time.Time{time.Now()}
}
return nil
},
func(_ context.Context, prefix string, recursive bool) error {
if isCollPrefix(prefix) {
return nil
}
return errors.New("mocked")
},
)
s.gc.option.collValidator = func(collID int64) bool {
return true
}
s.gc.scan()
s.mockChunkManager.AssertNotCalled(s.T(), "Remove", mock.Anything, mock.Anything)
})
}
func TestGarbageCollectorSuite(t *testing.T) {
suite.Run(t, new(GarbageCollectorSuite))
}
/*
func Test_garbageCollector_basic(t *testing.T) {
bucketName := `datacoord-ut` + strings.ToLower(funcutil.RandomString(8))
rootPath := `gc` + funcutil.RandomString(8)
......@@ -259,7 +97,7 @@ func Test_garbageCollector_basic(t *testing.T) {
})
})
}*/
}
func validateMinioPrefixElements(t *testing.T, cli *minio.Client, bucketName string, prefix string, elements []string) {
var current []string
......
......@@ -416,18 +416,6 @@ func (s *Server) initGarbageCollection(cli storage.ChunkManager) {
checkInterval: Params.DataCoordCfg.GCInterval,
missingTolerance: Params.DataCoordCfg.GCMissingTolerance,
dropTolerance: Params.DataCoordCfg.GCDropTolerance,
collValidator: func(collID int64) bool {
resp, err := s.rootCoordClient.DescribeCollectionInternal(context.Background(), &milvuspb.DescribeCollectionRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection),
),
CollectionID: collID,
})
if err != nil {
log.Warn("failed to check collection id", zap.Int64("collID", collID), zap.Error(err))
}
return resp.GetStatus().GetErrorCode() == commonpb.ErrorCode_Success
},
})
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册