diff --git a/cmd/tools/migration/allocator/allocator.go b/cmd/tools/migration/allocator/allocator.go new file mode 100644 index 0000000000000000000000000000000000000000..ad2a6b473d9976232422514801f7be902297c4bd --- /dev/null +++ b/cmd/tools/migration/allocator/allocator.go @@ -0,0 +1,7 @@ +package allocator + +import "github.com/milvus-io/milvus/internal/util/typeutil" + +type Allocator interface { + AllocID() (typeutil.UniqueID, error) +} diff --git a/cmd/tools/migration/allocator/allocator_from_list.go b/cmd/tools/migration/allocator/allocator_from_list.go new file mode 100644 index 0000000000000000000000000000000000000000..a449a732a8f9c21832e08e35f19ad64a4ebe8726 --- /dev/null +++ b/cmd/tools/migration/allocator/allocator_from_list.go @@ -0,0 +1,27 @@ +package allocator + +import "sort" + +func makeClone(s []int64) []int64 { + clone := make([]int64, len(s)) + copy(clone, s) + return clone +} + +func NewAllocatorFromList(s []int64, useClone, deAsc bool) *AtomicAllocator { + initialized, delta := int64(defaultInitializedValue), int64(defaultDelta) + clone := s + if useClone { + clone = makeClone(s) + } + sort.Slice(clone, func(i, j int) bool { return clone[i] < clone[j] }) + l := len(clone) + if l == 0 { + // no change + } else if deAsc { + initialized, delta = clone[0], -1 + } else { + initialized, delta = clone[l-1], 1 + } + return NewAllocator(WithInitializedValue(initialized), WithDelta(delta)) +} diff --git a/cmd/tools/migration/allocator/allocator_from_list_test.go b/cmd/tools/migration/allocator/allocator_from_list_test.go new file mode 100644 index 0000000000000000000000000000000000000000..4c21c9d34aad807810be31e02379f31771272f68 --- /dev/null +++ b/cmd/tools/migration/allocator/allocator_from_list_test.go @@ -0,0 +1,29 @@ +package allocator + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAllocatorFromList(t *testing.T) { + t.Run("de asc", func(t *testing.T) { + s := []int64{100000, 10000, 1000} + alloc := NewAllocatorFromList(s, true, true) + n := 100 + wg := &sync.WaitGroup{} + for i := 0; i < n; i++ { + wg.Add(1) + go func() { + defer wg.Done() + id, err := alloc.AllocID() + assert.NoError(t, err) + assert.True(t, id < 1000) + }() + } + wg.Wait() + assert.Equal(t, int64(-1), alloc.delta) + assert.Equal(t, int64(1000-n), alloc.now.Load()) + }) +} diff --git a/cmd/tools/migration/allocator/atomic_allocator.go b/cmd/tools/migration/allocator/atomic_allocator.go new file mode 100644 index 0000000000000000000000000000000000000000..5a8b353c7151d1c2a64f26f06b212aee14d00e2b --- /dev/null +++ b/cmd/tools/migration/allocator/atomic_allocator.go @@ -0,0 +1,50 @@ +package allocator + +import ( + "github.com/milvus-io/milvus/internal/util/typeutil" + "go.uber.org/atomic" +) + +const ( + defaultInitializedValue = 0 + defaultDelta = 1 +) + +type AtomicAllocator struct { + now atomic.Int64 + delta int64 +} + +type Option func(allocator *AtomicAllocator) + +func WithInitializedValue(value int64) Option { + return func(allocator *AtomicAllocator) { + allocator.now.Store(value) + } +} + +func WithDelta(delta int64) Option { + return func(allocator *AtomicAllocator) { + allocator.delta = delta + } +} + +func (alloc *AtomicAllocator) apply(opts ...Option) { + for _, opt := range opts { + opt(alloc) + } +} + +func (alloc *AtomicAllocator) AllocID() (typeutil.UniqueID, error) { + id := alloc.now.Add(alloc.delta) + return id, nil +} + +func NewAllocator(opts ...Option) *AtomicAllocator { + alloc := &AtomicAllocator{ + now: *atomic.NewInt64(defaultInitializedValue), + delta: defaultDelta, + } + alloc.apply(opts...) + return alloc +} diff --git a/cmd/tools/migration/allocator/atomic_allocator_test.go b/cmd/tools/migration/allocator/atomic_allocator_test.go new file mode 100644 index 0000000000000000000000000000000000000000..0ec24f8d16a200a2d04a6e9a9cba5c75a4e8991d --- /dev/null +++ b/cmd/tools/migration/allocator/atomic_allocator_test.go @@ -0,0 +1,25 @@ +package allocator + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAtomicAllocator_AllocID(t *testing.T) { + n := 100 + alloc := NewAllocator() + wg := &sync.WaitGroup{} + for i := 0; i < n; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, err := alloc.AllocID() + assert.NoError(t, err) + }() + } + wg.Wait() + assert.Equal(t, int64(defaultDelta), alloc.delta) + assert.Equal(t, int64(defaultInitializedValue+n*defaultDelta), alloc.now.Load()) +} diff --git a/cmd/tools/migration/meta/210_to_220.go b/cmd/tools/migration/meta/210_to_220.go index 583f381f4fa3e4f9dee8aada109207d9aa1b4a83..c650907d09580490f8f9febf954753633838df90 100644 --- a/cmd/tools/migration/meta/210_to_220.go +++ b/cmd/tools/migration/meta/210_to_220.go @@ -6,6 +6,10 @@ import ( "strconv" "strings" + "github.com/milvus-io/milvus/cmd/tools/migration/legacy/legacypb" + + "github.com/milvus-io/milvus/cmd/tools/migration/allocator" + "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus/cmd/tools/migration/versions" "github.com/milvus-io/milvus/internal/common" @@ -195,14 +199,45 @@ func combineToCollectionIndexesMeta220(fieldIndexes FieldIndexes210, collectionI return indexes, nil } +func getOrFillBuildMeta(record *pb.SegmentIndexInfo, indexBuildMeta IndexBuildMeta210, alloc allocator.Allocator) (*legacypb.IndexMeta, error) { + if record.GetBuildID() == 0 && !record.GetEnableIndex() { + buildID, err := alloc.AllocID() + if err != nil { + return nil, err + } + buildMeta := &legacypb.IndexMeta{ + IndexBuildID: buildID, + State: commonpb.IndexState_Finished, + FailReason: "", + Req: nil, + IndexFilePaths: nil, + MarkDeleted: false, + NodeID: 0, + IndexVersion: 1, // TODO: maybe a constraint is better. + Recycled: false, + SerializeSize: 0, + } + indexBuildMeta[buildID] = buildMeta + return buildMeta, nil + } + buildMeta, ok := indexBuildMeta[record.GetBuildID()] + if !ok { + return nil, fmt.Errorf("index build meta not found, segment id: %d, index id: %d, index build id: %d", + record.GetSegmentID(), record.GetIndexID(), record.GetBuildID()) + } + return buildMeta, nil +} + func combineToSegmentIndexesMeta220(segmentIndexes SegmentIndexesMeta210, indexBuildMeta IndexBuildMeta210) (SegmentIndexesMeta220, error) { + alloc := allocator.NewAllocatorFromList(indexBuildMeta.GetAllBuildIDs(), false, true) + segmentIndexModels := make(SegmentIndexesMeta220) for segID := range segmentIndexes { for indexID := range segmentIndexes[segID] { record := segmentIndexes[segID][indexID] - buildMeta, ok := indexBuildMeta[record.GetBuildID()] - if !ok { - return nil, fmt.Errorf("index build meta not found, segment id: %d, index id: %d, index build id: %d", segID, indexID, record.GetBuildID()) + buildMeta, err := getOrFillBuildMeta(record, indexBuildMeta, alloc) + if err != nil { + return nil, err } fileKeys := make([]string, len(buildMeta.GetIndexFilePaths())) diff --git a/cmd/tools/migration/meta/meta210.go b/cmd/tools/migration/meta/meta210.go index a5c6d6209c354383e49fc06eeb3503c8b93dd354..d4fcb1d261f258857089d687aa089c44736728d6 100644 --- a/cmd/tools/migration/meta/meta210.go +++ b/cmd/tools/migration/meta/meta210.go @@ -298,6 +298,14 @@ func (meta *IndexBuildMeta210) GenerateSaves() map[string]string { return kvs } +func (meta *IndexBuildMeta210) GetAllBuildIDs() []UniqueID { + ret := make([]UniqueID, 0, len(*meta)) + for buildID := range *meta { + ret = append(ret, buildID) + } + return ret +} + func (meta *FieldIndexes210) AddRecord(collectionID UniqueID, fieldIndexes []*pb.FieldIndexInfo, schema *schemapb.CollectionSchema) { (*meta)[collectionID] = &FieldIndexesWithSchema{indexes: fieldIndexes, schema: schema} }